You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2014/05/27 21:14:52 UTC
[2/3] MAHOUT-1529 closes PR #1
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
deleted file mode 100644
index 8e05a83..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/SSVD.scala
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.scalabindings
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import RLikeOps._
-import org.apache.mahout.common.RandomUtils
-import scala._
-import org.apache.log4j.Logger
-
-private[math] object SSVD {
-
- private val log = Logger.getLogger(SSVD.getClass)
-
- /**
- * In-core SSVD algorithm.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- var y = a %*% omega
- var yty = y.t %*% y
- val at = a.t
- var ch = chol(yty)
- assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
- var bt = ch.solveRight(at %*% y)
-
- // Power iterations
- for (i <- 0 until q) {
- y = a %*% bt
- yty = y.t %*% y
- ch = chol(yty)
- bt = ch.solveRight(at %*% y)
- }
-
- val bbt = bt.t %*% bt
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = ch.solveRight(y) %*% uhat
- val v = bt %*% (uhat %*%: diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
- }
-
- /**
- * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
- * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
- * to save some memory for sparse inputs by removing direct mean subtraction.<P>
- *
- * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
- * If retaining distances and orignal scaled variances not that important, the normalized PCA space
- * is just U.
- *
- * Important: data points are considered to be rows.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- // Dataset mean
- val xi = a.colMeans()
-
- if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
-
- var y = a %*% omega
-
- // Fixing y
- val s_o = omega.t %*% xi
- y := ((r,c,v) => v - s_o(c))
-
- var yty = y.t %*% y
- var ch = chol(yty)
-// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
- // This is implicit Q of QR(Y)
- var qm = ch.solveRight(y)
- var bt = a.t %*% qm
- var s_q = qm.colSums()
- var s_b = bt.t %*% xi
-
- // Power iterations
- for (i <- 0 until q) {
-
- // Fix bt
- bt -= xi cross s_q
-
- y = a %*% bt
-
- // Fix Y again.
- y := ((r,c,v) => v - s_b(c))
-
- yty = y.t %*% y
- ch = chol(yty)
- qm = ch.solveRight(y)
- bt = a.t %*% qm
- s_q = qm.colSums()
- s_b = bt.t %*% xi
- }
-
- val c = s_q cross s_b
-
- // BB' computation becomes
- val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
-
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = qm %*% uhat
- val v = bt %*% (uhat %*%: diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
new file mode 100644
index 0000000..80385a3
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/decompositions/SSVD.scala
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.math.decompositions
+
+import scala.math._
+import org.apache.mahout.math.{Matrices, Matrix}
+import org.apache.mahout.common.RandomUtils
+import org.apache.log4j.Logger
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+
+private[math] object SSVD {
+
+ private val log = Logger.getLogger(SSVD.getClass)
+
+ /**
+ * In-core SSVD algorithm.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ var y = a %*% omega
+ var yty = y.t %*% y
+ val at = a.t
+ var ch = chol(yty)
+ assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+ var bt = ch.solveRight(at %*% y)
+
+ // Power iterations
+ for (i <- 0 until q) {
+ y = a %*% bt
+ yty = y.t %*% y
+ ch = chol(yty)
+ bt = ch.solveRight(at %*% y)
+ }
+
+ val bbt = bt.t %*% bt
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = ch.solveRight(y) %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+ }
+
+ /**
+ * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
+ * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
+ * to save some memory for sparse inputs by removing direct mean subtraction.<P>
+ *
+ * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
+ * If retaining distances and orignal scaled variances not that important, the normalized PCA space
+ * is just U.
+ *
+ * Important: data points are considered to be rows.
+ *
+ * @param a input matrix A
+ * @param k request SSVD rank
+ * @param p oversampling parameter
+ * @param q number of power iterations
+ * @return (U,V,s)
+ */
+ def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
+ val m = a.nrow
+ val n = a.ncol
+ if (k > min(m, n))
+ throw new IllegalArgumentException(
+ "k cannot be greater than smaller of m,n")
+ val pfxed = min(p, min(m, n) - k)
+
+ // Actual decomposition rank
+ val r = k + pfxed
+
+ val rnd = RandomUtils.getRandom
+ val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
+
+ // Dataset mean
+ val xi = a.colMeans()
+
+ if (log.isDebugEnabled) log.debug("xi=%s".format(xi))
+
+ var y = a %*% omega
+
+ // Fixing y
+ val s_o = omega.t %*% xi
+ y := ((r,c,v) => v - s_o(c))
+
+ var yty = y.t %*% y
+ var ch = chol(yty)
+// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
+
+ // This is implicit Q of QR(Y)
+ var qm = ch.solveRight(y)
+ var bt = a.t %*% qm
+ var s_q = qm.colSums()
+ var s_b = bt.t %*% xi
+
+ // Power iterations
+ for (i <- 0 until q) {
+
+ // Fix bt
+ bt -= xi cross s_q
+
+ y = a %*% bt
+
+ // Fix Y again.
+ y := ((r,c,v) => v - s_b(c))
+
+ yty = y.t %*% y
+ ch = chol(yty)
+ qm = ch.solveRight(y)
+ bt = a.t %*% qm
+ s_q = qm.colSums()
+ s_b = bt.t %*% xi
+ }
+
+ val c = s_q cross s_b
+
+ // BB' computation becomes
+ val bbt = bt.t %*% bt -c - c.t + (s_q cross s_q) * (xi dot xi)
+
+ val (uhat, d) = eigen(bbt)
+
+ val s = d.sqrt
+ val u = qm %*% uhat
+ val v = bt %*% (uhat %*%: diagv(1 /: s))
+
+ (u(::, 0 until k), v(::, 0 until k), s(0 until k))
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
index c9e59ba..4599146 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/scalabindings/package.scala
@@ -19,6 +19,7 @@ package org.apache.mahout.math
import org.apache.mahout.math._
import org.apache.mahout.math.solver.EigenDecomposition
+import org.apache.mahout.math.decompositions.SSVD
/**
* Mahout matrices and vectors' scala syntactic sugar
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
index a97b453..020a2f9 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
@@ -2,22 +2,20 @@ package org.apache.mahout.sparkbindings.shell
import org.apache.spark.repl.SparkILoop
import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.mahout.sparkbindings._
import scala.tools.nsc.Properties
import scala.Some
+import org.apache.mahout.sparkbindings._
class MahoutSparkILoop extends SparkILoop {
private val postInitScript =
- "import org.apache.mahout.math.Vector" ::
- "import org.apache.mahout.math.scalabindings._" ::
- "import RLikeOps._" ::
- "import org.apache.mahout.sparkbindings._" ::
- "import drm._" ::
- "import RLikeDrmOps._" ::
- "org.apache.spark.storage.StorageLevel" ::
- "implicit val _sc = sc" ::
- Nil
+ "import org.apache.mahout.math._" ::
+ "import scalabindings._" ::
+ "import RLikeOps._" ::
+ "import drm._" ::
+ "import RLikeDrmOps._" ::
+ "import org.apache.mahout.sparkbindings._" ::
+ Nil
override protected def postInitialization() {
super.postInitialization()
@@ -50,10 +48,25 @@ class MahoutSparkILoop extends SparkILoop {
customJars = jars,
sparkConf = conf
)
+
echo("Created spark context..")
sparkContext
}
+ override def initializeSpark() {
+ intp.beQuietDuring {
+ command("""
+
+ @transient implicit val sdc: org.apache.mahout.math.drm.DistributedContext =
+ new org.apache.mahout.sparkbindings.SparkDistributedContext(
+ org.apache.spark.repl.Main.interp.createSparkContext())
+
+ """)
+ command("import org.apache.spark.SparkContext._")
+ echo("Mahout distributed context is available as \"implicit val sdc\".")
+ }
+ }
+
override def prompt: String = "mahout> "
override def printWelcome(): Unit = {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
index 648f07f..9c0a51f 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/Main.scala
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.mahout.sparkbindings.shell
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark-shell/src/test/mahout/simple.mscala
----------------------------------------------------------------------
diff --git a/spark-shell/src/test/mahout/simple.mscala b/spark-shell/src/test/mahout/simple.mscala
index 385e4e8..854c482 100644
--- a/spark-shell/src/test/mahout/simple.mscala
+++ b/spark-shell/src/test/mahout/simple.mscala
@@ -1,8 +1,25 @@
-import org.apache.mahout.sparkbindings._
-import drm._
-import RLikeDrmOps._
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
-import org.apache.spark.storage.StorageLevel
+/*
+ To run, execute from mahout shell:
+
+ :load spark-shell/src/test/mahout/simple.mscala
+*/
val a = dense((1,2,3),(3,4,5))
val drmA = drmParallelize(a,numPartitions = 2)
@@ -19,5 +36,5 @@ r.collect
// local write
r.writeDRM("file:///home/dmitriy/A")
-// hdfs write
-r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file
+// hdfs write -- uncomment to test
+// r.writeDRM("hdfs://localhost:11010/A")
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/pom.xml
----------------------------------------------------------------------
diff --git a/spark/pom.xml b/spark/pom.xml
index 8b89969..ac99ffd 100644
--- a/spark/pom.xml
+++ b/spark/pom.xml
@@ -220,53 +220,52 @@
</property>
</activation>
<dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop2.version}</version>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.mahout</groupId>
- <artifactId>mahout-mrlegacy</artifactId>
- <exclusions>
- <exclusion>
- <groupId>asm</groupId>
- <artifactId>asm</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-core</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-core</artifactId>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-mapreduce-client-common</artifactId>
- </dependency>
- </dependencies>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop2.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.mahout</groupId>
+ <artifactId>mahout-mrlegacy</artifactId>
+ <exclusions>
+ <exclusion>
+ <groupId>asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-mapreduce-client-common</artifactId>
+ </dependency>
+ </dependencies>
</profile>
</profiles>
<dependencies>
- <!-- spark stuff -->
+
<dependency>
- <groupId>org.apache.spark</groupId>
- <artifactId>spark-core_${scala.major}</artifactId>
- <version>${spark.version}</version>
+ <groupId>org.apache.mahout</groupId>
+ <artifactId>mahout-math-scala</artifactId>
</dependency>
<dependency>
@@ -283,18 +282,18 @@
<dependency>
<groupId>org.apache.mahout</groupId>
<artifactId>mahout-math-scala</artifactId>
- </dependency>
-
- <dependency>
- <groupId>org.apache.mahout</groupId>
- <artifactId>mahout-math-scala</artifactId>
<classifier>tests</classifier>
<scope>test</scope>
</dependency>
<!-- 3rd-party -->
-
+ <!-- spark stuff -->
+ <dependency>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>spark-core_${scala.major}</artifactId>
+ <version>${spark.version}</version>
+ </dependency>
<!-- scala stuff -->
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
new file mode 100644
index 0000000..4d13a5a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkDistributedContext.scala
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.drm.{DistributedEngine, BCast, DistributedContext}
+import org.apache.spark.SparkContext
+
+class SparkDistributedContext(val sc: SparkContext) extends DistributedContext {
+
+ val engine: DistributedEngine = SparkEngine
+
+ def close() {
+ sc.stop()
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
new file mode 100644
index 0000000..0c904ab
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings
+
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import org.apache.mahout.math.drm._
+import org.apache.mahout.math.drm.logical._
+import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.math._
+import scala.reflect.ClassTag
+import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings.blas._
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import scala.Some
+import scala.collection.JavaConversions._
+import org.apache.spark.SparkContext
+
+/** Spark-specific non-drm-method operations */
+object SparkEngine extends DistributedEngine {
+
+ def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+ val n = drm.ncol
+
+ drm.rdd
+ // Throw away keys
+ .map(_._2)
+ // Fold() doesn't work with kryo still. So work around it.
+ .mapPartitions(iter => {
+ val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
+ Iterator(acc)
+ })
+ // Since we preallocated new accumulator vector per partition, this must not cause any side
+ // effects now.
+ .reduce(_ += _)
+ }
+
+ /** Engine-specific colMeans implementation based on a checkpoint. */
+ def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
+
+ /**
+ * Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
+ *
+ * A particular physical engine implementation may choose to either use or not use these rewrites
+ * as a useful basic rewriting rule.<P>
+ */
+ override def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = super.optimizerRewrite(action)
+
+
+ /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
+ def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
+
+ // Spark-specific Physical Plan translation.
+ val rdd = tr2phys(plan)
+
+ val newcp = new CheckpointedDrmSpark(
+ rdd = rdd,
+ _nrow = plan.nrow,
+ _ncol = plan.ncol,
+ _cacheStorageLevel = cacheHint2Spark(ch),
+ partitioningTag = plan.partitioningTag
+ )
+ newcp.cache()
+ }
+
+ /** Broadcast support */
+ def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector] = dc.broadcast(v)
+
+ /** Broadcast support */
+ def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix] = dc.broadcast(m)
+
+ /**
+ * Load DRM from hdfs (as in Mahout DRM format)
+ *
+ * @param path
+ * @param sc spark context (wanted to make that implicit, doesn't work in current version of
+ * scala with the type bounds, sorry)
+ *
+ * @return DRM[Any] where Any is automatically translated to value type
+ */
+ def drmFromHDFS (path: String)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+ implicit val scc:SparkContext = sc
+ val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable]).map(t => (t._1, t._2.get()))
+
+ val key = rdd.map(_._1).take(1)(0)
+ val keyWClass = key.getClass.asSubclass(classOf[Writable])
+
+ val key2val = key match {
+ case xx: IntWritable => (v: AnyRef) => v.asInstanceOf[IntWritable].get
+ case xx: Text => (v: AnyRef) => v.asInstanceOf[Text].toString
+ case xx: LongWritable => (v: AnyRef) => v.asInstanceOf[LongWritable].get
+ case xx: Writable => (v: AnyRef) => v
+ }
+
+ val val2key = key match {
+ case xx: IntWritable => (x: Any) => new IntWritable(x.asInstanceOf[Int])
+ case xx: Text => (x: Any) => new Text(x.toString)
+ case xx: LongWritable => (x: Any) => new LongWritable(x.asInstanceOf[Int])
+ case xx: Writable => (x: Any) => x.asInstanceOf[Writable]
+ }
+
+ val km = key match {
+ case xx: IntWritable => implicitly[ClassTag[Int]]
+ case xx: Text => implicitly[ClassTag[String]]
+ case xx: LongWritable => implicitly[ClassTag[Long]]
+ case xx: Writable => ClassTag(classOf[Writable])
+ }
+
+ {
+ implicit def getWritable(x: Any): Writable = val2key()
+ new CheckpointedDrmSpark(rdd.map(t => (key2val(t._1), t._2)))(km.asInstanceOf[ClassTag[Any]])
+ }
+ }
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
+ def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext)
+ : CheckpointedDrm[Int] = {
+ new CheckpointedDrmSpark(rdd = parallelizeInCore(m, numPartitions))
+ }
+
+ private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext): DrmRdd[Int] = {
+
+ val p = (0 until m.nrow).map(i => i -> m(i, ::))
+ sc.parallelize(p, numPartitions)
+
+ }
+
+ /** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
+ def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
+ (implicit sc: DistributedContext)
+ : CheckpointedDrm[String] = {
+
+ val rb = m.getRowLabelBindings
+ val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
+
+ new CheckpointedDrmSpark(rdd = sc.parallelize(p, numPartitions))
+ }
+
+ /** This creates an empty DRM with specified number of partitions and cardinality. */
+ def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+ val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+ val partNRow = (nrow - 1) / numPartitions + 1
+ val partStart = partNRow * part
+ val partEnd = Math.min(partStart + partNRow, nrow)
+
+ for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ })
+ new CheckpointedDrmSpark[Int](rdd, nrow, ncol)
+ }
+
+ def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
+ (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
+ val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+ val partNRow = (nrow - 1) / numPartitions + 1
+ val partStart = partNRow * part
+ val partEnd = Math.min(partStart + partNRow, nrow)
+
+ for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ })
+ new CheckpointedDrmSpark[Long](rdd, nrow, ncol)
+ }
+
+ private def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
+ case CacheHint.NONE => StorageLevel.NONE
+ case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
+ case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
+ case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
+ case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
+ case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
+ case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
+ case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
+ case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
+ case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
+ case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
+ }
+
+ /** Translate previously optimized physical plan */
+ private def tr2phys[K: ClassTag](oper: DrmLike[K]): DrmRddInput[K] = {
+ // I do explicit evidence propagation here since matching via case classes seems to be loosing
+ // it and subsequently may cause something like DrmRddInput[Any] instead of [Int] or [String].
+ // Hence you see explicit evidence attached to all recursive exec() calls.
+ oper match {
+ // If there are any such cases, they must go away in pass1. If they were not, then it wasn't
+ // the A'A case but actual transposition intent which should be removed from consideration
+ // (we cannot do actual flip for non-int-keyed arguments)
+ case OpAtAnyKey(_) =>
+ throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
+ case op@OpAt(a) => At.at(op, tr2phys(a)(op.classTagA))
+ case op@OpABt(a, b) => ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAtB(a, b) => AtB.atb_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB),
+ zippable = a.partitioningTag == b.partitioningTag)
+ case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
+ case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
+ case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
+ case op@OpAewB(a, b, '+') => AewB.a_plus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '-') => AewB.a_minus_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '*') => AewB.a_hadamard_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewB(a, b, '/') => AewB.a_eldiv_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewScalar(a, s, "+") => AewB.a_plus_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "-") => AewB.a_minus_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "-:") => AewB.scalar_minus_a(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "*") => AewB.a_times_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "/") => AewB.a_div_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpAewScalar(a, s, "/:") => AewB.scalar_div_a(op, tr2phys(a)(op.classTagA), s)
+ case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
+ case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+ // Custom operators, we just execute them
+ case blockOp: OpMapBlock[K, _] => MapBlock.exec(
+ src = tr2phys(blockOp.A)(blockOp.classTagA),
+ ncol = blockOp.ncol,
+ bmf = blockOp.bmf
+ )
+ case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
+ case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
+ .format(oper))
+
+ }
+ }
+
+
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 8d19068..97873bd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -19,11 +19,12 @@ package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.math.scalabindings._
import RLikeOps._
-import org.apache.mahout.sparkbindings.drm.plan.OpABt
import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.sparkbindings._
+import drm._
import org.apache.mahout.math.{Matrix, SparseRowMatrix}
import org.apache.spark.SparkContext._
+import org.apache.mahout.math.drm.logical.OpABt
/** Contains RDD plans for ABt operator */
object ABt {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index ec93cf7..ec6e99e 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -17,13 +17,13 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.plan.{OpAewScalar, OpAewB}
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import scala.reflect.ClassTag
import org.apache.spark.SparkContext._
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.{Matrix, Vector}
+import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
/** Elementwise drm-drm operators */
object AewB {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index 0383fe1..c923e62 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -1,13 +1,14 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import drm._
+import scalabindings._
import RLikeOps._
-
+import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.OpTimesRightMatrix
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
import scala.reflect.ClassTag
import org.apache.mahout.math.DiagonalMatrix
+import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
/** Matrix product with one of operands an in-core matrix */
object AinCoreB {
@@ -21,8 +22,8 @@ object AinCoreB {
private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
- val dg = drmBroadcast(x = op.right.viewDiagonal())
+ implicit val ctx:DistributedContext = rddA.context
+ val dg = drmBroadcast(op.right.viewDiagonal())
val rdd = rddA
// Just multiply the blocks
@@ -35,7 +36,7 @@ object AinCoreB {
private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
+ implicit val sc:DistributedContext = rddA.sparkContext
val bcastB = drmBroadcast(m = op.right)
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 38af173..56de9f4 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -17,12 +17,12 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.plan.OpAt
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.spark.SparkContext._
import org.apache.mahout.math.{DenseVector, Vector, SequentialAccessSparseVector}
+import org.apache.mahout.math.drm.logical.OpAt
/** A' algorithms */
object At {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index 17cab62..450e836 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -18,14 +18,16 @@
package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.math._
+import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm._
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import collection._
import JavaConversions._
-import org.apache.mahout.sparkbindings.drm.plan.OpAtA
import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtA
+import SparkEngine._
/**
* Collection of algorithms to compute X' times X
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index f0c3423..86aadc8 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -18,13 +18,14 @@
package org.apache.mahout.sparkbindings.blas
import scala.reflect.ClassTag
+import org.apache.mahout.math.drm._
import org.apache.mahout.sparkbindings.drm._
import org.apache.spark.rdd.RDD
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.spark.SparkContext._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtB}
import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.OpAtB
object AtB {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index e6de443..94c3f06 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -1,12 +1,13 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
import RLikeOps._
-
-import org.apache.mahout.sparkbindings.drm._
-import org.apache.mahout.sparkbindings.drm.plan.{OpAtx, OpAx, OpTimesRightMatrix}
+import drm._
+import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import scala.reflect.ClassTag
+import org.apache.mahout.math.drm.logical.{OpAx, OpAtx}
/** Matrix product with one of operands an in-core matrix */
@@ -15,9 +16,9 @@ object Ax {
def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
+ implicit val sc:DistributedContext = rddA.sparkContext
- val bcastX = drmBroadcast(x = op.x)
+ val bcastX = drmBroadcast(op.x)
val rdd = rddA
// Just multiply the blocks
@@ -30,9 +31,9 @@ object Ax {
def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc = rddA.sparkContext
+ implicit val dc:DistributedContext = rddA.sparkContext
- val bcastX = drmBroadcast(x = op.x)
+ val bcastX = drmBroadcast(op.x)
val inCoreM = rddA
// Just multiply the blocks
@@ -51,7 +52,7 @@ object Ax {
// It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
// it back as drm blockified rdd
- val rdd = sc.parallelize(Seq(inCoreM), numSlices = 1)
+ val rdd = dc.parallelize(Seq(inCoreM), numSlices = 1)
.map(block => Array.tabulate(block.nrow)(i => i) -> block)
new DrmRddInput(blockifiedSrc = Some(rdd))
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index 6bb7b4b..a3caac7 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -17,11 +17,11 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.DrmRdd
import scala.reflect.ClassTag
import org.apache.mahout.math.scalabindings._
import RLikeOps._
import org.apache.mahout.math.{SequentialAccessSparseVector, DenseVector}
+import org.apache.mahout.sparkbindings.DrmRdd
class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
new file mode 100644
index 0000000..4c68c9a
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.blas
+
+import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.BlockMapFunc
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import scala.reflect.ClassTag
+
+object MapBlock {
+
+ def exec[S, R:ClassTag](src: DrmRddInput[S], ncol:Int, bmf:BlockMapFunc[S,R]): DrmRddInput[R] = {
+
+ // We can't use attributes to avoid putting the whole this into closure.
+
+ val rdd = src.toBlockifiedDrmRdd()
+ .map(blockTuple => {
+ val out = bmf(blockTuple)
+
+ assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
+ assert(out._2.ncol == ncol, "block map must return %d number of columns.".format(ncol))
+
+ out
+ })
+ new DrmRddInput(blockifiedSrc = Some(rdd))
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index 5affd3b..d0a50b5 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -1,7 +1,7 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.plan.OpRowRange
import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.math.drm.logical.OpRowRange
object Slicing {
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 795f2e2..d2d5340 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -17,7 +17,6 @@
package org.apache.mahout.sparkbindings
-import org.apache.mahout.sparkbindings.drm.DrmRdd
import scala.reflect.ClassTag
/**
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
deleted file mode 100644
index 89d3735..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DQR.scala
+++ /dev/null
@@ -1,56 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.log4j.Logger
-
-object DQR {
-
- private val log = Logger.getLogger(DQR.getClass)
-
- /**
- * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
- * controlled (<5000 or so). <P>
- *
- * It is recommended to checkpoint A since it does two passes over it. <P>
- *
- * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
- * their RDD should be able to zip successfully.
- */
- def dqrThin[K: ClassTag](A: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) = {
-
- if (A.ncol > 5000)
- log.warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
- val AtA = (A.t %*% A).checkpoint()
- val inCoreAtA = AtA.collect
- implicit val sc = AtA.rdd.sparkContext
-
- if (log.isDebugEnabled) log.debug("A'A=\n%s\n".format(inCoreAtA))
-
- val ch = chol(inCoreAtA)
- val inCoreR = (ch.getL cloned) t
-
- if (log.isDebugEnabled) log.debug("R=\n%s\n".format(inCoreR))
-
- if (checkRankDeficiency && !ch.isPositiveDefinite)
- throw new IllegalArgumentException("R is rank-deficient.")
-
- val bcastAtA = sc.broadcast(inCoreAtA)
-
- // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
- // decompose A'A in the backend again.
-
- // Compute Q = A*inv(L') -- we can do it blockwise.
- val Q = A.mapBlock() {
- case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
- }
-
- Q -> inCoreR
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
deleted file mode 100644
index f3b0e3f..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
- /**
- * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
- * document of the MAHOUT-817.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations (hint: use either 0 or 1)
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dspca[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- val drmA = A.checkpoint()
- implicit val sc = drmA.rdd.sparkContext
-
- val m = drmA.nrow
- val n = drmA.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // Dataset mean
- val xi = drmA.colMeans
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
- val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
- // This done in front in a single-threaded fashion for now. Even though it doesn't require any
- // memory beyond that is required to keep xi around, it still might be parallelized to backs
- // for significantly big n and r. TODO
- val s_o = omega.t %*% xi
-
- val bcastS_o = drmBroadcast(s_o)
- val bcastXi = drmBroadcast(xi)
-
- var drmY = drmA.mapBlock(ncol = r) {
- case (keys, blockA) =>
- val s_o:Vector = bcastS_o
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- for (row <- 0 until blockY.nrow) blockY(row, ::) -= s_o
- keys -> blockY
- }
- // Checkpoint Y
- .checkpoint()
-
- var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
- var s_q = drmQ.colSums()
- var bcastVarS_q = drmBroadcast(s_q)
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = (drmA.t %*% drmQ).checkpoint()
-
- var s_b = (drmBt.t %*% xi).collect(::, 0)
- var bcastVarS_b = drmBroadcast(s_b)
-
- for (i <- 0 until q) {
-
- // These closures don't seem to live well with outside-scope vars. This doesn't record closure
- // attributes correctly. So we create additional set of vals for broadcast vars to properly
- // create readonly closure attributes in this very scope.
- val bcastS_q = bcastVarS_q
- val bcastS_b = bcastVarS_b
- val bcastXib = bcastXi
-
- // Fix Bt as B' -= xi cross s_q
- drmBt = drmBt.mapBlock() {
- case (keys, block) =>
- val s_q: Vector = bcastS_q
- val xi: Vector = bcastXib
- keys.zipWithIndex.foreach {
- case (key, idx) => block(idx, ::) -= s_q * xi(key)
- }
- keys -> block
- }
-
- drmY.uncache()
- drmQ.uncache()
-
- drmY = (drmA %*% drmBt)
- // Fix Y by subtracting s_b from each row of the AB'
- .mapBlock() {
- case (keys, block) =>
- val s_b: Vector = bcastS_b
- for (row <- 0 until block.nrow) block(row, ::) -= s_b
- keys -> block
- }
- // Checkpoint Y
- .checkpoint()
-
- drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint()
-
- s_q = drmQ.colSums()
- bcastVarS_q = drmBroadcast(s_q)
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.
- drmBt = (drmA.t %*% drmQ).checkpoint()
-
- s_b = (drmBt.t %*% xi).collect(::, 0)
- bcastVarS_b = drmBroadcast(s_b)
- }
-
- val c = s_q cross s_b
- val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect -
- c - c.t + (s_q cross s_q) * (xi dot xi)
- val (inCoreUHat, d) = eigen(inCoreBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
deleted file mode 100644
index de15d2b..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,83 +0,0 @@
-package org.apache.mahout.sparkbindings.drm.decompositions
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.sparkbindings.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSSVD {
-
- /**
- * Distributed Stochastic Singular Value decomposition algorithm.
- *
- * @param A input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dssvd[K: ClassTag](A: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- val drmA = A.checkpoint()
-
- val m = drmA.nrow
- val n = drmA.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
-
- // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
- // instantiate the Omega random matrix view in the backend instead. That way serialized closure
- // is much more compact.
- var drmY = drmA.mapBlock(ncol = r) {
- case (keys, blockA) =>
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- keys -> blockY
- }
-
- var drmQ = dqrThin(drmY.checkpoint())._1
- // Checkpoint Q if last iteration
- if (q == 0) drmQ = drmQ.checkpoint()
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = drmA.t %*% drmQ
- // Checkpoint B' if last iteration
- if (q == 0) drmBt = drmBt.checkpoint()
-
- for (i <- 0 until q) {
- drmY = drmA %*% drmBt
- drmQ = dqrThin(drmY.checkpoint())._1
- // Checkpoint Q if last iteration
- if (i == q - 1) drmQ = drmQ.checkpoint()
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.
- drmBt = drmA.t %*% drmQ
- // Checkpoint B' if last iteration
- if (i == q - 1) drmBt = drmBt.checkpoint()
- }
-
- val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(CacheHint.NONE).collect
- val (inCoreUHat, d) = eigen(inCoreBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
deleted file mode 100644
index faa89ef..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CacheHint.scala
+++ /dev/null
@@ -1,20 +0,0 @@
-package org.apache.mahout.sparkbindings.drm
-
-
-object CacheHint extends Enumeration {
-
- type CacheHint = Value
-
- val NONE,
- DISK_ONLY,
- DISK_ONLY_2,
- MEMORY_ONLY,
- MEMORY_ONLY_2,
- MEMORY_ONLY_SER,
- MEMORY_ONLY_SER_2,
- MEMORY_AND_DISK,
- MEMORY_AND_DISK_2,
- MEMORY_AND_DISK_SER,
- MEMORY_AND_DISK_SER_2 = Value
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
deleted file mode 100644
index 0007477..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrm.scala
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.Matrix
-import org.apache.hadoop.io.Writable
-
-/**
- * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
- * therefore collected or saved.
- * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
- */
-trait CheckpointedDrm[K] extends DrmLike[K] {
-
- def rdd: DrmRdd[K]
-
- def collect: Matrix
-
- def writeDRM(path: String)
-
- /** If this checkpoint is already declared cached, uncache. */
- def uncache()
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
deleted file mode 100644
index 8216881..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmBase.scala
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.mahout.math.{SparseMatrix, DenseMatrix, Matrix, Vector}
-import math._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import scala.collection.JavaConversions._
-import org.apache.spark.storage.StorageLevel
-import org.apache.spark.SparkContext._
-import reflect._
-import scala.util.Random
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
-
-class CheckpointedDrmBase[K: ClassTag](
- val rdd: DrmRdd[K],
- private var _nrow: Long = -1L,
- private var _ncol: Int = -1,
- private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
- private[sparkbindings] val partitioningTag: Long = Random.nextLong()
-
- ) extends CheckpointedDrm[K] {
-
-
- lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
- lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
-
- private var cached: Boolean = false
-
-
- /**
- * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
- * and writing down Spark graph lineage since last checkpointed DRM.
- */
- def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] =
- // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
- this
-
- def cache() = {
- if (!cached) {
- rdd.persist(_cacheStorageLevel)
- cached = true
- }
- this
- }
-
-
- /**
- * if matrix was previously persisted into cache,
- * delete cached representation
- */
- def uncache() = {
- if (cached) {
- rdd.unpersist(blocking = false)
- cached = false
- }
- this
- }
-
- def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmBase[K] =
- new CheckpointedDrmBase[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
-
-
- /**
- * Collecting DRM to fron-end in-core Matrix.
- *
- * If key in DRM is Int, then matrix is collected using key as row index.
- * Otherwise, order of rows in result is undefined but key.toString is applied
- * as rowLabelBindings of the in-core matrix .
- *
- * Note that this pre-allocates target matrix and then assigns collected RDD to it
- * thus this likely would require about 2 times the RDD memory
- * @return
- */
- def collect: Matrix = {
-
- val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
-
- val cols = rdd.map(_._2.length).fold(0)(max(_, _))
- val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
-
- // since currently spark #collect() requires Serializeable support,
- // we serialize DRM vectors into byte arrays on backend and restore Vector
- // instances on the front end:
- val data = rdd.map(t => (t._1, t._2)).collect()
-
-
- val m = if (data.forall(_._2.isDense))
- new DenseMatrix(rows, cols)
-
- else
- new SparseMatrix(rows, cols)
-
- if (intRowIndices)
- data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
- else {
-
- // assign all rows sequentially
- val d = data.zipWithIndex
- d.foreach(t => m(t._2, ::) := t._1._2)
-
- // row bindings
- val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
-
- m.setRowLabelBindings(rowBindings)
- }
-
- m
- }
-
-
- /**
- * Dump matrix as computed Mahout's DRM into specified (HD)FS path
- * @param path
- */
- def writeDRM(path: String) = {
- val ktag = implicitly[ClassTag[K]]
-
- implicit val k2wFunc: (K) => Writable =
- if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
- else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
- else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
- else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
- else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
-// implicit def any2w(k: Any): Writable = k2wFunc(k)
- rdd.saveAsSequenceFile(path)
- }
-
- protected def computeNRow = {
-
- val intRowIndex = classTag[K] == classTag[Int]
-
- if (intRowIndex)
- cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
- else
- cache().rdd.count()
- }
-
- protected def computeNCol =
- cache().rdd.map(_._2.length).fold(-1)(max(_, _))
-
- protected def computeNNonZero =
- cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
new file mode 100644
index 0000000..2d80fe3
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math._
+import math._
+import org.apache.mahout.math.scalabindings._
+import RLikeOps._
+import scala.collection.JavaConversions._
+import org.apache.spark.storage.StorageLevel
+import reflect._
+import scala.util.Random
+import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import org.apache.mahout.math.drm._
+import org.apache.mahout.sparkbindings._
+import org.apache.spark.SparkContext._
+
+/** Spark-specific optimizer-checkpointed DRM. */
+class CheckpointedDrmSpark[K: ClassTag](
+ val rdd: DrmRdd[K],
+ private var _nrow: Long = -1L,
+ private var _ncol: Int = -1,
+ private val _cacheStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
+ override protected[mahout] val partitioningTag: Long = Random.nextLong()
+ ) extends CheckpointedDrm[K] {
+
+ lazy val nrow = if (_nrow >= 0) _nrow else computeNRow
+ lazy val ncol = if (_ncol >= 0) _ncol else computeNCol
+
+ private var cached: Boolean = false
+ override protected[mahout] val context: DistributedContext = rdd.context
+
+
+ /**
+ * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
+ * and writing down Spark graph lineage since last checkpointed DRM.
+ */
+ def checkpoint(cacheHint: CacheHint.CacheHint): CheckpointedDrm[K] = {
+ // We are already checkpointed in a sense that we already have Spark lineage. So just return self.
+ this
+ }
+
+ def cache() = {
+ if (!cached) {
+ rdd.persist(_cacheStorageLevel)
+ cached = true
+ }
+ this
+ }
+
+
+ /**
+ * if matrix was previously persisted into cache,
+ * delete cached representation
+ */
+ def uncache() = {
+ if (cached) {
+ rdd.unpersist(blocking = false)
+ cached = false
+ }
+ this
+ }
+
+// def mapRows(mapfun: (K, Vector) => Vector): CheckpointedDrmSpark[K] =
+// new CheckpointedDrmSpark[K](rdd.map(t => (t._1, mapfun(t._1, t._2))))
+
+
+ /**
+ * Collecting DRM to fron-end in-core Matrix.
+ *
+ * If key in DRM is Int, then matrix is collected using key as row index.
+ * Otherwise, order of rows in result is undefined but key.toString is applied
+ * as rowLabelBindings of the in-core matrix .
+ *
+ * Note that this pre-allocates target matrix and then assigns collected RDD to it
+ * thus this likely would require about 2 times the RDD memory
+ * @return
+ */
+ def collect: Matrix = {
+
+ val intRowIndices = implicitly[ClassTag[K]] == implicitly[ClassTag[Int]]
+
+ val cols = rdd.map(_._2.length).fold(0)(max(_, _))
+ val rows = if (intRowIndices) rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1 else rdd.count().toInt
+
+ // since currently spark #collect() requires Serializeable support,
+ // we serialize DRM vectors into byte arrays on backend and restore Vector
+ // instances on the front end:
+ val data = rdd.map(t => (t._1, t._2)).collect()
+
+
+ val m = if (data.forall(_._2.isDense))
+ new DenseMatrix(rows, cols)
+
+ else
+ new SparseMatrix(rows, cols)
+
+ if (intRowIndices)
+ data.foreach(t => m(t._1.asInstanceOf[Int], ::) := t._2)
+ else {
+
+ // assign all rows sequentially
+ val d = data.zipWithIndex
+ d.foreach(t => m(t._2, ::) := t._1._2)
+
+ // row bindings
+ val rowBindings = d.map(t => (t._1._1.toString, t._2: java.lang.Integer)).toMap
+
+ m.setRowLabelBindings(rowBindings)
+ }
+
+ m
+ }
+
+
+ /**
+ * Dump matrix as computed Mahout's DRM into specified (HD)FS path
+ * @param path
+ */
+ def writeDRM(path: String) = {
+ val ktag = implicitly[ClassTag[K]]
+
+ implicit val k2wFunc: (K) => Writable =
+ if (ktag.runtimeClass == classOf[Int]) (x: K) => new IntWritable(x.asInstanceOf[Int])
+ else if (ktag.runtimeClass == classOf[String]) (x: K) => new Text(x.asInstanceOf[String])
+ else if (ktag.runtimeClass == classOf[Long]) (x: K) => new LongWritable(x.asInstanceOf[Long])
+ else if (classOf[Writable].isAssignableFrom(ktag.runtimeClass)) (x: K) => x.asInstanceOf[Writable]
+ else throw new IllegalArgumentException("Do not know how to convert class tag %s to Writable.".format(ktag))
+
+ rdd.saveAsSequenceFile(path)
+ }
+
+ protected def computeNRow = {
+
+ val intRowIndex = classTag[K] == classTag[Int]
+
+ if (intRowIndex)
+ cache().rdd.map(_._1.asInstanceOf[Int]).fold(-1)(max(_, _)) + 1L
+ else
+ cache().rdd.count()
+ }
+
+ protected def computeNCol =
+ cache().rdd.map(_._2.length).fold(-1)(max(_, _))
+
+ protected def computeNNonZero =
+ cache().rdd.map(_._2.getNumNonZeroElements.toLong).sum().toLong
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
new file mode 100644
index 0000000..7cf6bd6
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSparkOps.scala
@@ -0,0 +1,16 @@
+package org.apache.mahout.sparkbindings.drm
+
+import org.apache.mahout.math.drm.CheckpointedDrm
+import scala.reflect.ClassTag
+
+/** Additional Spark-specific operations. Requires underlying DRM to be running on Spark backend. */
+class CheckpointedDrmSparkOps[K: ClassTag](drm: CheckpointedDrm[K]) {
+
+ assert(drm.isInstanceOf[CheckpointedDrmSpark[K]], "must be a Spark-backed matrix")
+
+ private[sparkbindings] val sparkDrm = drm.asInstanceOf[CheckpointedDrmSpark[K]]
+
+ /** Spark matrix customization exposure */
+ def rdd = sparkDrm.rdd
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
deleted file mode 100644
index f891c1e..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedOps.scala
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.{DenseVector, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import RLikeDrmOps._
-import org.apache.spark.SparkContext._
-
-
-/**
- * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
- * the DRMBase once they stabilize.
- *
- */
-class CheckpointedOps[K: ClassTag](val drm: CheckpointedDrm[K]) {
-
- /**
- * Reorganize every partition into a single in-core matrix
- * @return
- */
- def blockify(): BlockifiedDrmRdd[K] =
- org.apache.mahout.sparkbindings.drm.blockify(rdd = drm.rdd, blockncol = drm.ncol)
-
- /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
- def colSums(): Vector = {
- val n = drm.ncol
-
- drm.rdd
- // Throw away keys
- .map(_._2)
- // Fold() doesn't work with kryo still. So work around it.
- .mapPartitions(iter => {
- val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
- Iterator(acc)
- })
- // Since we preallocated new accumulator vector per partition, this must not cause any side
- // effects now.
- .reduce(_ += _)
-
- }
-
- def colMeans(): Vector = if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
-
-}
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
deleted file mode 100644
index de1f9bd..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLike.scala
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import org.apache.spark.storage.StorageLevel
-
-/**
- *
- * Basic spark DRM trait.
- *
- * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
- * this package. Spark backing is already implied.
- *
- */
-trait DrmLike[K] {
-
- private[sparkbindings] def partitioningTag:Long
-
- /** R-like syntax for number of rows. */
- def nrow: Long
-
- /** R-like syntax for number of columns */
- def ncol: Int
-
- /**
- * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
- * and writing down Spark graph lineage since last checkpointed DRM.
- */
- def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
deleted file mode 100644
index ce7b867..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmLikeOps.scala
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.drm
-
-import scala.reflect.ClassTag
-import org.apache.hadoop.io.Writable
-import org.apache.mahout.sparkbindings.drm.plan.{OpRowRange, OpMapBlock}
-import RLikeDrmOps._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-/** Common Drm ops */
-class DrmLikeOps[K : ClassTag](protected[drm] val drm: DrmLike[K]) {
-
- /**
- * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
- * matrices; or they could be completely new matrices with new keyset. In the latter case, output
- * matrix width must be specified with <code>ncol</code> parameter.<P>
- *
- * New block heights must be of the same height as the original geometry.<P>
- *
- * @param ncol new matrix' width (only needed if width changes).
- * @param bmf
- * @tparam R
- * @return
- */
- def mapBlock[R : ClassTag](ncol: Int = -1)
- (bmf: BlockMapFunc[K, R]): DrmLike[R] =
- new OpMapBlock[K, R](A = drm, bmf = bmf, _ncol = ncol)
-
-
- /**
- * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
- *
- * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
- *
- * Row range is currently unsupported except for the all-range. When it will be fully supported,
- * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
- *
- * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
- * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
- */
- def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
-
-
- val rowSrc: DrmLike[K] = if (rowRange != ::) {
-
- if (implicitly[ClassTag[Int]] == implicitly[ClassTag[K]]) {
-
- assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
- val intKeyed = drm.asInstanceOf[DrmLike[Int]]
-
- new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
-
- } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
-
- } else drm
-
- if (colRange != ::) {
-
- assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
-
- // Use mapBlock operator to do in-core subranging.
- rowSrc.mapBlock(ncol = colRange.length)({
- case (keys, block) => keys -> block(::, colRange)
- })
-
- } else rowSrc
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8714a0f7/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
index 47cfa26..3801c77 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/DrmRddInput.scala
@@ -20,6 +20,7 @@ package org.apache.mahout.sparkbindings.drm
import scala.reflect.ClassTag
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
+import org.apache.mahout.sparkbindings._
/** Encapsulates either DrmRdd[K] or BlockifiedDrmRdd[K] */
class DrmRddInput[K: ClassTag](