You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ra...@apache.org on 2018/06/27 14:51:49 UTC
[21/51] [partial] mahout git commit: MAHOUT-2042 and MAHOUT-2045
Delete directories which were moved/no longer in use
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala b/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
deleted file mode 100644
index f69bf81..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/cf/SimilarityAnalysis.scala
+++ /dev/null
@@ -1,453 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.cf
-
-import org.apache.mahout.math._
-import org.apache.mahout.math.indexeddataset.IndexedDataset
-import scalabindings._
-import RLikeOps._
-import drm._
-import RLikeDrmOps._
-import scala.collection.JavaConversions._
-import org.apache.mahout.math.stats.LogLikelihood
-import collection._
-import org.apache.mahout.math.function.{VectorFunction, Functions}
-
-import scala.util.Random
-
-
-/**
- * Based on "Ted Dunnning & Ellen Friedman: Practical Machine Learning, Innovations in Recommendation",
- * available at http://www.mapr.com/practical-machine-learning
- *
- * see also "Sebastian Schelter, Christoph Boden, Volker Markl:
- * Scalable Similarity-Based Neighborhood Methods with MapReduce
- * ACM Conference on Recommender Systems 2012"
- */
-object SimilarityAnalysis extends Serializable {
-
- /** Compares (Int,Double) pairs by the second value */
- private val orderByScore = Ordering.fromLessThan[(Int, Double)] { case ((_, score1), (_, score2)) => score1 > score2}
-
- lazy val defaultParOpts = ParOpts()
-
- /**
- * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ...
- * and returns a list of similarity and cross-similarity matrices
- *
- * @param drmARaw Primary interaction matrix
- * @param randomSeed when kept to a constant will make repeatable downsampling
- * @param maxInterestingItemsPerThing number of similar items to return per item, default: 50
- * @param maxNumInteractions max number of interactions after downsampling, default: 500
- * @param parOpts partitioning params for drm.par(...)
- * @return a list of [[org.apache.mahout.math.drm.DrmLike]] containing downsampled DRMs for cooccurrence and
- * cross-cooccurrence
- */
- def cooccurrences(
- drmARaw: DrmLike[Int],
- randomSeed: Int = 0xdeadbeef,
- maxInterestingItemsPerThing: Int = 50,
- maxNumInteractions: Int = 500,
- drmBs: Array[DrmLike[Int]] = Array(),
- parOpts: ParOpts = defaultParOpts)
- : List[DrmLike[Int]] = {
-
- implicit val distributedContext = drmARaw.context
-
- // backend partitioning defaults to 'auto', which is often better decided by calling funciton
- // todo: this should ideally be different per drm
- drmARaw.par( min = parOpts.minPar, exact = parOpts.exactPar, auto = parOpts.autoPar)
-
- // Apply selective downsampling, pin resulting matrix
- val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
-
- // num users, which equals the maximum number of interactions per item
- val numUsers = drmA.nrow.toInt
-
- // Compute & broadcast the number of interactions per thing in A
- val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
-
- // Compute cooccurrence matrix A'A
- val drmAtA = drmA.t %*% drmA
-
- // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
- val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, maxInterestingItemsPerThing,
- bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
-
- var similarityMatrices = List(drmSimilarityAtA)
-
- // Now look at cross cooccurrences
- for (drmBRaw <- drmBs) {
- // backend partitioning defaults to 'auto', which is often better decided by calling funciton
- // todo: this should ideally be different per drm
- drmARaw.par( min = parOpts.minPar, exact = parOpts.exactPar, auto = parOpts.autoPar)
-
- // Down-sample and pin other interaction matrix
- val drmB = sampleDownAndBinarize(drmBRaw, randomSeed, maxNumInteractions).checkpoint()
-
- // Compute & broadcast the number of interactions per thing in B
- val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
-
- // Compute cross-cooccurrence matrix A'B
- val drmAtB = drmA.t %*% drmB
-
- val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, maxInterestingItemsPerThing,
- bcastInteractionsPerItemA, bcastInteractionsPerThingB)
-
- similarityMatrices = similarityMatrices :+ drmSimilarityAtB
-
- drmB.uncache()
- }
-
- // Unpin downsampled interaction matrix
- drmA.uncache()
-
- // Return list of similarity matrices
- similarityMatrices
- }
-
- /**
- * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns
- * a list of similarity and cross-similarity matrices. Somewhat easier to use method, which handles the ID
- * dictionaries correctly
- *
- * @param indexedDatasets first in array is primary/A matrix all others are treated as secondary
- * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
- * @param maxInterestingItemsPerThing max similarities per items
- * @param maxNumInteractions max number of input items per item
- * @param parOpts partitioning params for drm.par(...)
- * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled
- * IndexedDatasets for cooccurrence and cross-cooccurrence
- */
- def cooccurrencesIDSs(
- indexedDatasets: Array[IndexedDataset],
- randomSeed: Int = 0xdeadbeef,
- maxInterestingItemsPerThing: Int = 50,
- maxNumInteractions: Int = 500,
- parOpts: ParOpts = defaultParOpts):
- List[IndexedDataset] = {
- val drms = indexedDatasets.map(_.matrix.asInstanceOf[DrmLike[Int]])
- val primaryDrm = drms(0)
- val secondaryDrms = drms.drop(1)
- val coocMatrices = cooccurrences(primaryDrm, randomSeed, maxInterestingItemsPerThing,
- maxNumInteractions, secondaryDrms, parOpts)
- val retIDSs = coocMatrices.iterator.zipWithIndex.map {
- case( drm, i ) =>
- indexedDatasets(0).create(drm, indexedDatasets(0).columnIDs, indexedDatasets(i).columnIDs)
- }
- retIDSs.toList
- }
-
- /**
- * Calculates item (column-wise) similarity using the log-likelihood ratio on A'A, A'B, A'C, ... and returns
- * a list of similarity and cross-occurrence matrices. Somewhat easier to use method, which handles the ID
- * dictionaries correctly and contains info about downsampling in each model calc.
- *
- * @param datasets first in array is primary/A matrix all others are treated as secondary, includes information
- * used to downsample the input drm as well as the output llr(A'A), llr(A'B). The information
- * is contained in each dataset in the array and applies to the model calculation of A' with
- * the dataset. Todo: ignoring absolute threshold for now.
- * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
- * @param parOpts partitioning params for drm.par(...)
- * @return a list of [[org.apache.mahout.math.indexeddataset.IndexedDataset]] containing downsampled
- * IndexedDatasets for cooccurrence and cross-cooccurrence
- */
- def crossOccurrenceDownsampled(
- datasets: List[DownsamplableCrossOccurrenceDataset],
- randomSeed: Int = 0xdeadbeef):
- List[IndexedDataset] = {
-
-
- val crossDatasets = datasets.drop(1) // drop A
- val primaryDataset = datasets.head // use A throughout
- val drmARaw = primaryDataset.iD.matrix
-
- implicit val distributedContext = primaryDataset.iD.matrix.context
-
- // backend partitioning defaults to 'auto', which is often better decided by calling funciton
- val parOptsA = primaryDataset.parOpts.getOrElse(defaultParOpts)
- drmARaw.par( min = parOptsA.minPar, exact = parOptsA.exactPar, auto = parOptsA.autoPar)
-
- // Apply selective downsampling, pin resulting matrix
- val drmA = sampleDownAndBinarize(drmARaw, randomSeed, primaryDataset.maxElementsPerRow)
-
- // num users, which equals the maximum number of interactions per item
- val numUsers = drmA.nrow.toInt
-
- // Compute & broadcast the number of interactions per thing in A
- val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerColumn)
-
- // Compute cooccurrence matrix A'A
- val drmAtA = drmA.t %*% drmA
-
- // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
- val drmSimilarityAtA = computeSimilarities(drmAtA, numUsers, primaryDataset.maxInterestingElements,
- bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false,
- minLLROpt = primaryDataset.minLLROpt)
-
- var similarityMatrices = List(drmSimilarityAtA)
-
- // Now look at cross cooccurrences
- for (dataset <- crossDatasets) {
- // backend partitioning defaults to 'auto', which is often better decided by calling funciton
- val parOptsB = dataset.parOpts.getOrElse(defaultParOpts)
- dataset.iD.matrix.par(min = parOptsB.minPar, exact = parOptsB.exactPar, auto = parOptsB.autoPar)
-
- // Downsample and pin other interaction matrix
- val drmB = sampleDownAndBinarize(dataset.iD.matrix, randomSeed, dataset.maxElementsPerRow).checkpoint()
-
- // Compute & broadcast the number of interactions per thing in B
- val bcastInteractionsPerThingB = drmBroadcast(drmB.numNonZeroElementsPerColumn)
-
- // Compute cross-cooccurrence matrix A'B
- val drmAtB = drmA.t %*% drmB
-
- val drmSimilarityAtB = computeSimilarities(drmAtB, numUsers, dataset.maxInterestingElements,
- bcastInteractionsPerItemA, bcastInteractionsPerThingB, minLLROpt = dataset.minLLROpt)
-
- similarityMatrices = similarityMatrices :+ drmSimilarityAtB
-
- drmB.uncache()
- }
-
- // Unpin downsampled interaction matrix
- drmA.uncache()
-
- // Return list of datasets
- val retIDSs = similarityMatrices.iterator.zipWithIndex.map {
- case( drm, i ) =>
- datasets(0).iD.create(drm, datasets(0).iD.columnIDs, datasets(i).iD.columnIDs)
- }
- retIDSs.toList
-
- }
-
- /**
- * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a DRM of rows and similar rows
- *
- * @param drmARaw Primary interaction matrix
- * @param randomSeed when kept to a constant will make repeatable downsampling
- * @param maxInterestingSimilaritiesPerRow number of similar items to return per item, default: 50
- * @param maxNumInteractions max number of interactions after downsampling, default: 500
- * @param parOpts partitioning options used for drm.par(...)
- */
- def rowSimilarity(
- drmARaw: DrmLike[Int],
- randomSeed: Int = 0xdeadbeef,
- maxInterestingSimilaritiesPerRow: Int = 50,
- maxNumInteractions: Int = 500,
- parOpts: ParOpts = defaultParOpts): DrmLike[Int] = {
-
- implicit val distributedContext = drmARaw.context
-
- // backend partitioning defaults to 'auto', which is often better decided by calling funciton
- // todo: should this ideally be different per drm?
- drmARaw.par(min = parOpts.minPar, exact = parOpts.exactPar, auto = parOpts.autoPar)
-
- // Apply selective downsampling, pin resulting matrix
- val drmA = sampleDownAndBinarize(drmARaw, randomSeed, maxNumInteractions)
-
- // num columns, which equals the maximum number of interactions per item
- val numCols = drmA.ncol
-
- // Compute & broadcast the number of interactions per row in A
- val bcastInteractionsPerItemA = drmBroadcast(drmA.numNonZeroElementsPerRow)
-
- // Compute row similarity cooccurrence matrix AA'
- val drmAAt = drmA %*% drmA.t
-
- // Compute loglikelihood scores and sparsify the resulting matrix to get the similarity matrix
- val drmSimilaritiesAAt = computeSimilarities(drmAAt, numCols, maxInterestingSimilaritiesPerRow,
- bcastInteractionsPerItemA, bcastInteractionsPerItemA, crossCooccurrence = false)
-
- drmSimilaritiesAAt
- }
-
- /**
- * Calculates row-wise similarity using the log-likelihood ratio on AA' and returns a drm of rows and similar rows.
- * Uses IndexedDatasets, which handle external ID dictionaries properly
- *
- * @param indexedDataset compare each row to every other
- * @param randomSeed use default to make repeatable, otherwise pass in system time or some randomizing seed
- * @param maxInterestingSimilaritiesPerRow max elements returned in each row
- * @param maxObservationsPerRow max number of input elements to use
- */
- def rowSimilarityIDS(indexedDataset: IndexedDataset, randomSeed: Int = 0xdeadbeef,
- maxInterestingSimilaritiesPerRow: Int = 50,
- maxObservationsPerRow: Int = 500):
- IndexedDataset = {
- val coocMatrix = rowSimilarity(indexedDataset.matrix, randomSeed, maxInterestingSimilaritiesPerRow,
- maxObservationsPerRow)
- indexedDataset.create(coocMatrix, indexedDataset.rowIDs, indexedDataset.rowIDs)
- }
-
- /** Compute loglikelihood ratio see http://tdunning.blogspot.de/2008/03/surprise-and-coincidence.html for details */
- def logLikelihoodRatio(numInteractionsWithA: Long, numInteractionsWithB: Long,
- numInteractionsWithAandB: Long, numInteractions: Long) = {
-
- val k11 = numInteractionsWithAandB
- val k12 = numInteractionsWithA - numInteractionsWithAandB
- val k21 = numInteractionsWithB - numInteractionsWithAandB
- val k22 = numInteractions - numInteractionsWithA - numInteractionsWithB + numInteractionsWithAandB
-
- LogLikelihood.logLikelihoodRatio(k11, k12, k21, k22)
-
- }
-
- def computeSimilarities(
- drm: DrmLike[Int],
- numUsers: Int,
- maxInterestingItemsPerThing: Int,
- bcastNumInteractionsB: BCast[Vector],
- bcastNumInteractionsA: BCast[Vector],
- crossCooccurrence: Boolean = true,
- minLLROpt: Option[Double] = None) = {
-
- //val minLLR = minLLROpt.getOrElse(0.0d) // accept all values if not specified
-
- val minLLR = minLLROpt
-
- drm.mapBlock() {
- case (keys, block) =>
-
- val llrBlock = block.like()
- val numInteractionsB: Vector = bcastNumInteractionsB
- val numInteractionsA: Vector = bcastNumInteractionsA
-
- for (index <- 0 until keys.size) {
-
- val thingB = keys(index)
-
- // PriorityQueue to select the top-k items
- val topItemsPerThing = new mutable.PriorityQueue[(Int, Double)]()(orderByScore)
-
- block(index, ::).nonZeroes().foreach { elem =>
- val thingA = elem.index
- val cooccurrences = elem.get
-
- // exclude co-occurrences of the item with itself
- if (crossCooccurrence || thingB != thingA) {
- // Compute loglikelihood ratio
- val llr = logLikelihoodRatio(numInteractionsB(thingB).toLong, numInteractionsA(thingA).toLong,
- cooccurrences.toLong, numUsers)
-
- val candidate = thingA -> llr
-
- // legacy hadoop code maps values to range (0..1) via
- // val normailizedLLR = 1.0 - (1.0 / (1.0 + llr))
- // val candidate = thingA -> normailizedLLR
-
- // Enqueue item with score, if belonging to the top-k
- if(minLLR.isEmpty || llr >= minLLR.get) { // llr threshold takes precedence over max per row
- if (topItemsPerThing.size < maxInterestingItemsPerThing) {
- topItemsPerThing.enqueue(candidate)
- } else if (orderByScore.lt(candidate, topItemsPerThing.head)) {
- topItemsPerThing.dequeue()
- topItemsPerThing.enqueue(candidate)
- }
- }
- }
- }
-
- // Add top-k interesting items to the output matrix
- topItemsPerThing.dequeueAll.foreach {
- case (otherThing, llrScore) =>
- llrBlock(index, otherThing) = llrScore
- }
- }
-
- keys -> llrBlock
- }
- }
-
- /**
- * Selectively downsample rows and items with an anomalous amount of interactions, inspired by
- * https://github.com/tdunning/in-memory-cooccurrence/blob/master/src/main/java/com/tdunning/cooc/Analyze.java
- *
- * additionally binarizes input matrix, as we're only interesting in knowing whether interactions happened or not
- *
- * @param drmM matrix to downsample
- * @param seed random number generator seed, keep to a constant if repeatability is neccessary
- * @param maxNumInteractions number of elements in a row of the returned matrix
- * @return the downsampled DRM
- */
- def sampleDownAndBinarize(drmM: DrmLike[Int], seed: Int, maxNumInteractions: Int) = {
-
- implicit val distributedContext = drmM.context
-
- // Pin raw interaction matrix
- val drmI = drmM.checkpoint()
-
- // Broadcast vector containing the number of interactions with each thing
- val bcastNumInteractions = drmBroadcast(drmI.numNonZeroElementsPerColumn)
-
- val downSampledDrmI = drmI.mapBlock() {
- case (keys, block) =>
- val numInteractions: Vector = bcastNumInteractions
-
- // Use a hash of the unique first key to seed the RNG, makes this computation repeatable in case of
- //failures
- val random = new Random(MurmurHash.hash(keys(0), seed))
-
- val downsampledBlock = block.like()
-
- // Downsample the interaction vector of each row
- for (rowIndex <- 0 until keys.size) {
-
- val interactionsInRow = block(rowIndex, ::)
-
- val numInteractionsPerRow = interactionsInRow.getNumNonZeroElements()
-
- val perRowSampleRate = math.min(maxNumInteractions, numInteractionsPerRow) / numInteractionsPerRow
-
- interactionsInRow.nonZeroes().foreach { elem =>
- val numInteractionsWithThing = numInteractions(elem.index)
- val perThingSampleRate = math.min(maxNumInteractions, numInteractionsWithThing) / numInteractionsWithThing
-
- if (random.nextDouble() <= math.min(perRowSampleRate, perThingSampleRate)) {
- // We ignore the original interaction value and create a binary 0-1 matrix
- // as we only consider whether interactions happened or did not happen
- downsampledBlock(rowIndex, elem.index) = 1
- }
- }
- }
-
- keys -> downsampledBlock
- }
-
- // Unpin raw interaction matrix
- drmI.uncache()
-
- downSampledDrmI
- }
-}
-
-case class ParOpts( // this will contain the default `par` params except for auto = true
- minPar: Int = -1,
- exactPar: Int = -1,
- autoPar: Boolean = true)
-
-/* Used to pass in data and params for downsampling the input data as well as output A'A, A'B, etc. */
-case class DownsamplableCrossOccurrenceDataset(
- iD: IndexedDataset,
- maxElementsPerRow: Int = 500, // usually items per user in the input dataset, used to ramdomly downsample
- maxInterestingElements: Int = 50, // number of items/columns to keep in the A'A, A'B etc. where iD == A, B, C ...
- minLLROpt: Option[Double] = None, // absolute threshold, takes precedence over maxInterestingElements if present
- parOpts: Option[ParOpts] = None) // these can be set per dataset and are applied to each of the drms
- // in crossOccurrenceDownsampled
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
deleted file mode 100644
index 8ced112..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/ALS.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import org.apache.mahout.math._
-import drm._
-import scalabindings._
-import RLikeDrmOps._
-import RLikeOps._
-import org.apache.log4j.Logger
-import math._
-import org.apache.mahout.common.RandomUtils
-
-/** Simple ALS factorization algotithm. To solve, use train() method. */
-private[math] object ALS {
-
- private val log = Logger.getLogger(ALS.getClass)
-
- /**
- * ALS training result. <P>
- *
- * <code>drmU %*% drmV.t</code> is supposed to approximate the input.
- *
- * @param drmU U matrix
- * @param drmV V matrix
- * @param iterationsRMSE RMSE values afeter each of iteration performed
- */
- class Result[K](val drmU: DrmLike[K], val drmV: DrmLike[Int], val iterationsRMSE: Iterable[Double]) {
- def toTuple = (drmU, drmV, iterationsRMSE)
- }
-
- /** Result class for in-core results */
- class InCoreResult(val inCoreU: Matrix, inCoreV: Matrix, val iterationsRMSE: Iterable[Double]) {
- def toTuple = (inCoreU, inCoreV, iterationsRMSE)
- }
-
- /**
- * Run Distributed ALS.
- * <P>
- *
- * Example:
- *
- * <pre>
- * val (u,v,errors) = als(input, k).toTuple
- * </pre>
- *
- * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
- * whichever earlier.
- * <P>
- *
- * @param drmA The input matrix
- * @param k required rank of decomposition (number of cols in U and V results)
- * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
- * value. If <=0 then we won't compute RMSE and use convergence test.
- * @param lambda regularization rate
- * @param maxIterations maximum iterations to run regardless of convergence
- * @tparam K row key type of the input (100 is probably more than enough)
- * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
- */
- def dals[K](
- drmA: DrmLike[K],
- k: Int = 50,
- lambda: Double = 0.0,
- maxIterations: Int = 10,
- convergenceThreshold: Double = 0.10
- ): Result[K] = {
-
- assert(convergenceThreshold < 1.0, "convergenceThreshold")
- assert(maxIterations >= 1, "maxIterations")
-
- // Some mapblock() usage may require to know ClassTag[K] bound
- implicit val ktag = drmA.keyClassTag
-
- val drmAt = drmA.t
-
- // Initialize U and V so that they are identically distributed to A or A'
- var drmU = drmA.mapBlock(ncol = k) {
- case (keys, block) =>
- val rnd = RandomUtils.getRandom()
- val uBlock = Matrices.symmetricUniformView(block.nrow, k, rnd.nextInt()) * 0.01
- keys -> uBlock
- }
-
- var drmV: DrmLike[Int] = null
- var rmseIterations: List[Double] = Nil
-
- // ALS iterator
- var stop = false
- var i = 0
- while (!stop && i < maxIterations) {
-
- // Alternate. This is really what ALS is.
- if (drmV != null) drmV.uncache()
- drmV = (drmAt %*% drmU %*% solve(drmU.t %*% drmU -: diag(lambda, k))).checkpoint()
-
- drmU.uncache()
- drmU = (drmA %*% drmV %*% solve(drmV.t %*% drmV -: diag(lambda, k))).checkpoint()
-
- // Check if we are requested to do a convergence test; and do it if yes.
- if (convergenceThreshold > 0) {
-
- val rmse = (drmA - drmU %*% drmV.t).norm / sqrt(drmA.ncol * drmA.nrow)
-
- if (i > 0) {
- val rmsePrev = rmseIterations.last
- val convergence = (rmsePrev - rmse) / rmsePrev
-
- if (convergence < 0) {
- log.warn("Rmse increase of %f. Should not happen.".format(convergence))
- // I guess error growth can happen in ideal data case?
- stop = true
- } else if (convergence < convergenceThreshold) {
- stop = true
- }
- }
- rmseIterations :+= rmse
- }
-
- i += 1
- }
-
- new Result(drmU, drmV, rmseIterations)
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
deleted file mode 100644
index 389eba0..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DQR.scala
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import org.apache.mahout.logging._
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-
-object DQR {
-
- private final implicit val log = getLog(DQR.getClass)
-
- /**
- * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
- * controlled (<5000 or so). <P>
- *
- * It is recommended to checkpoint A since it does two passes over it. <P>
- *
- * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
- * their RDD should be able to zip successfully.
- */
- def dqrThin[K](drmA: DrmLike[K],
- checkRankDeficiency: Boolean = true,
- cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): (DrmLike[K], Matrix) = {
-
- // Some mapBlock() calls need it
- implicit val ktag = drmA.keyClassTag
-
- if (drmA.ncol > 5000)
- warn("A is too fat. A'A must fit in memory and easily broadcasted.")
-
- implicit val ctx = drmA.context
-
- val AtA = (drmA.t %*% drmA).checkpoint(cacheHint)
- val inCoreAtA = AtA.collect
-
- trace("A'A=\n%s\n".format(inCoreAtA))
-
- val ch = chol(inCoreAtA)
- val inCoreR = (ch.getL cloned) t
-
- trace("R=\n%s\n".format(inCoreR))
-
- if (checkRankDeficiency && !ch.isPositiveDefinite)
- throw new IllegalArgumentException("R is rank-deficient.")
-
- val bcastAtA = drmBroadcast(inCoreAtA)
-
- // Unfortunately, I don't think Cholesky decomposition is serializable to backend. So we re-
- // decompose A'A in the backend again.
-
- // Compute Q = A*inv(L') -- we can do it blockwise.
- val Q = drmA.mapBlock() {
- case (keys, block) => keys -> chol(bcastAtA).solveRight(block)
- }
-
- Q -> inCoreR
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
deleted file mode 100644
index 2c010bb..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSPCA.scala
+++ /dev/null
@@ -1,162 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import org.apache.mahout.math.{Matrices, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-
-object DSPCA {
-
- /**
- * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
- * document of the MAHOUT-817.
- *
- * @param drmA input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations (hint: use either 0 or 1)
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dspca[K](drmA: DrmLike[K],
- k: Int,
- p: Int = 15,
- q: Int = 0,
- cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY):
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- // Some mapBlock() calls need it
- implicit val ktag = drmA.keyClassTag
-
- val drmAcp = drmA.checkpoint(cacheHint)
- implicit val ctx = drmAcp.context
-
- val m = drmAcp.nrow
- val n = drmAcp.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // Dataset mean
- val mu = drmAcp.colMeans
-
- val mtm = mu dot mu
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
- val omega = Matrices.symmetricUniformView(n, r, omegaSeed)
-
- // This done in front in a single-threaded fashion for now. Even though it doesn't require any
- // memory beyond that is required to keep xi around, it still might be parallelized to backs
- // for significantly big n and r. TODO
- val s_o = omega.t %*% mu
-
- val bcastS_o = drmBroadcast(s_o)
- val bcastMu = drmBroadcast(mu)
-
- var drmY = drmAcp.mapBlock(ncol = r) {
- case (keys, blockA) ⇒
- val s_o:Vector = bcastS_o
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- for (row ← 0 until blockY.nrow) blockY(row, ::) -= s_o
- keys → blockY
- }
- // Checkpoint Y
- .checkpoint(cacheHint)
-
- var drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint(cacheHint)
-
- var s_q = drmQ.colSums()
- var bcastVarS_q = drmBroadcast(s_q)
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = (drmAcp.t %*% drmQ).checkpoint(cacheHint)
-
- var s_b = (drmBt.t %*% mu).collect(::, 0)
- var bcastVarS_b = drmBroadcast(s_b)
-
- for (i ← 0 until q) {
-
- // These closures don't seem to live well with outside-scope vars. This doesn't record closure
- // attributes correctly. So we create additional set of vals for broadcast vars to properly
- // create readonly closure attributes in this very scope.
- val bcastS_q = bcastVarS_q
- val bcastMuInner = bcastMu
-
- // Fix Bt as B' -= xi cross s_q
- drmBt = drmBt.mapBlock() {
- case (keys, block) ⇒
- val s_q: Vector = bcastS_q
- val mu: Vector = bcastMuInner
- keys.zipWithIndex.foreach {
- case (key, idx) ⇒ block(idx, ::) -= s_q * mu(key)
- }
- keys → block
- }
-
- drmY.uncache()
- drmQ.uncache()
-
- val bCastSt_b = drmBroadcast(s_b -=: mtm * s_q)
-
- drmY = (drmAcp %*% drmBt)
- // Fix Y by subtracting st_b from each row of the AB'
- .mapBlock() {
- case (keys, block) ⇒
- val st_b: Vector = bCastSt_b
- block := { (_, c, v) ⇒ v - st_b(c) }
- keys → block
- }
- // Checkpoint Y
- .checkpoint(cacheHint)
-
- drmQ = dqrThin(drmY, checkRankDeficiency = false)._1.checkpoint(cacheHint)
-
- s_q = drmQ.colSums()
- bcastVarS_q = drmBroadcast(s_q)
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.
- drmBt = (drmAcp.t %*% drmQ).checkpoint(cacheHint)
-
- s_b = (drmBt.t %*% mu).collect(::, 0)
- bcastVarS_b = drmBroadcast(s_b)
- }
-
- val c = s_q cross s_b
- val inCoreBBt = (drmBt.t %*% drmBt).checkpoint(cacheHint).collect -=:
- c -=: c.t +=: mtm *=: (s_q cross s_q)
- val (inCoreUHat, d) = eigen(inCoreBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*% diagv(1 / s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
deleted file mode 100644
index d917d11..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/DSSVD.scala
+++ /dev/null
@@ -1,100 +0,0 @@
-package org.apache.mahout.math.decompositions
-
-import org.apache.mahout.math.{Matrices, Matrix, Vector}
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-import org.apache.mahout.math.drm._
-import RLikeDrmOps._
-import org.apache.mahout.common.RandomUtils
-import org.apache.mahout.logging._
-
-object DSSVD {
-
- private final implicit val log = getLog(DSSVD.getClass)
-
- /**
- * Distributed Stochastic Singular Value decomposition algorithm.
- *
- * @param drmA input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dssvd[K](drmA: DrmLike[K],
- k: Int,
- p: Int = 15,
- q: Int = 0,
- cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY):
-
- (DrmLike[K], DrmLike[Int], Vector) = {
-
- // Some mapBlock() calls need it
- implicit val ktag = drmA.keyClassTag
-
- val drmAcp = drmA.checkpoint(cacheHint)
-
- val m = drmAcp.nrow
- val n = drmAcp.ncol
- assert(k <= (m min n), "k cannot be greater than smaller of m, n.")
- val pfxed = safeToNonNegInt((m min n) - k min p)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- // We represent Omega by its seed.
- val omegaSeed = RandomUtils.getRandom().nextInt()
-
- // Compute Y = A*Omega. Instead of redistributing view, we redistribute the Omega seed only and
- // instantiate the Omega random matrix view in the backend instead. That way serialized closure
- // is much more compact.
- var drmY = drmAcp.mapBlock(ncol = r) {
- case (keys, blockA) ⇒
- val blockY = blockA %*% Matrices.symmetricUniformView(n, r, omegaSeed)
- keys → blockY
- }.checkpoint(cacheHint)
-
- var drmQ = dqrThin(drmY)._1
- // Checkpoint Q if last iteration
- if (q == 0) drmQ = drmQ.checkpoint(cacheHint)
-
- trace(s"dssvd:drmQ=${drmQ.collect}.")
-
- // This actually should be optimized as identically partitioned map-side A'B since A and Q should
- // still be identically partitioned.
- var drmBt = drmAcp.t %*% drmQ
- // Checkpoint B' if last iteration
- if (q == 0) drmBt = drmBt.checkpoint(cacheHint)
-
- trace(s"dssvd:drmB'=${drmBt.collect}.")
-
- for (i ← 0 until q) {
- drmY = drmAcp %*% drmBt
- drmQ = dqrThin(drmY.checkpoint(cacheHint))._1
- // Checkpoint Q if last iteration
- if (i == q - 1) drmQ = drmQ.checkpoint(cacheHint)
-
- // This on the other hand should be inner-join-and-map A'B optimization since A and Q_i are not
- // identically partitioned anymore.`
- drmBt = drmAcp.t %*% drmQ
- // Checkpoint B' if last iteration
- if (i == q - 1) drmBt = drmBt.checkpoint(cacheHint)
- }
-
- val mxBBt:Matrix = drmBt.t %*% drmBt
-
- trace(s"dssvd: BB'=$mxBBt.")
-
- val (inCoreUHat, d) = eigen(mxBBt)
- val s = d.sqrt
-
- // Since neither drmU nor drmV are actually computed until actually used, we don't need the flags
- // instructing compute (or not compute) either of the U,V outputs anymore. Neat, isn't it?
- val drmU = drmQ %*% inCoreUHat
- val drmV = drmBt %*% (inCoreUHat %*%: diagv(1 /: s))
-
- (drmU(::, 0 until k), drmV(::, 0 until k), s(0 until k))
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
deleted file mode 100644
index fba9517..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/SSVD.scala
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.decompositions
-
-import scala.math._
-import org.apache.mahout.math.{Matrices, Matrix}
-import org.apache.mahout.common.RandomUtils
-import org.apache.log4j.Logger
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-private[math] object SSVD {
-
- private val log = Logger.getLogger(SSVD.getClass)
-
- /**
- * In-core SSVD algorithm.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- var y = a %*% omega
- var yty = y.t %*% y
- val at = a.t
- var ch = chol(yty)
- assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
- var bt = ch.solveRight(at %*% y)
-
- // Power iterations
- for (i ← 0 until q) {
- y = a %*% bt
- yty = y.t %*% y
- ch = chol(yty)
- bt = ch.solveRight(at %*% y)
- }
-
- val bbt = bt.t %*% bt
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = ch.solveRight(y) %*% uhat
- val v = bt %*% (uhat %*% diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
- }
-
- /**
- * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
- * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
- * to save some memory for sparse inputs by removing direct mean subtraction.<P>
- *
- * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
- * If retaining distances and orignal scaled variances not that important, the normalized PCA space
- * is just U.
- *
- * Important: data points are considered to be rows.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def spca(a:Matrix, k: Int, p: Int = 15, q: Int = 0) = {
- val m = a.nrow
- val n = a.ncol
- if (k > min(m, n))
- throw new IllegalArgumentException(
- "k cannot be greater than smaller of m,n")
- val pfxed = min(p, min(m, n) - k)
-
- // Actual decomposition rank
- val r = k + pfxed
-
- val rnd = RandomUtils.getRandom
- val omega = Matrices.symmetricUniformView(n, r, rnd.nextInt)
-
- // Dataset mean
- val mu = a.colMeans()
- val mtm = mu dot mu
-
- if (log.isDebugEnabled) log.debug("xi=%s".format(mu))
-
- var y = a %*% omega
-
- // Fixing y
- val s_o = omega.t %*% mu
- y := ((r,c,v) ⇒ v - s_o(c))
-
- var yty = y.t %*% y
- var ch = chol(yty)
-// assert(ch.isPositiveDefinite, "Rank-deficiency detected during s-SVD")
-
- // This is implicit Q of QR(Y)
- var qm = ch.solveRight(y)
- var bt = a.t %*% qm
- var s_q = qm.colSums()
- var s_b = bt.t %*% mu
-
- // Power iterations
- for (i ← 0 until q) {
-
- // Fix bt
- bt -= mu cross s_q
-
- y = a %*% bt
-
- // Fix Y again.
- val st_b = s_b -=: mtm * s_q
- y := ((r,c,v) ⇒ v - st_b(c))
-
- yty = y.t %*% y
- ch = chol(yty)
- qm = ch.solveRight(y)
- bt = a.t %*% qm
- s_q = qm.colSums()
- s_b = bt.t %*% mu
- }
-
- val c = s_q cross s_b
-
- // BB' computation becomes
- val bbt = bt.t %*% bt -= c -= c.t += (mtm * s_q cross s_q)
-
- val (uhat, d) = eigen(bbt)
-
- val s = d.sqrt
- val u = qm %*% uhat
- val v = bt %*% (uhat %*%: diagv(1 /: s))
-
- (u(::, 0 until k), v(::, 0 until k), s(0 until k))
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
deleted file mode 100644
index a7b829f..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/decompositions/package.scala
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.drm.DrmLike
-
-/**
- * This package holds all decomposition and factorization-like methods, all that we were able to make
- * distributed engine-independent so far, anyway.
- */
-package object decompositions {
-
- // ================ In-core decompositions ===================
-
- /**
- * In-core SSVD algorithm.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def ssvd(a: Matrix, k: Int, p: Int = 15, q: Int = 0) = SSVD.ssvd(a, k, p, q)
-
- /**
- * PCA based on SSVD that runs without forming an always-dense A-(colMeans(A)) input for SVD. This
- * follows the solution outlined in MAHOUT-817. For in-core version it, for most part, is supposed
- * to save some memory for sparse inputs by removing direct mean subtraction.<P>
- *
- * Hint: Usually one wants to use AV which is approsimately USigma, i.e.<code>u %*%: diagv(s)</code>.
- * If retaining distances and orignal scaled variances not that important, the normalized PCA space
- * is just U.
- *
- * Important: data points are considered to be rows.
- *
- * @param a input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s)
- */
- def spca(a: Matrix, k: Int, p: Int = 15, q: Int = 0) =
- SSVD.spca(a = a, k = k, p = p, q = q)
-
- // ============== Distributed decompositions ===================
-
- /**
- * Distributed _thin_ QR. A'A must fit in a memory, i.e. if A is m x n, then n should be pretty
- * controlled (<5000 or so). <P>
- *
- * It is recommended to checkpoint A since it does two passes over it. <P>
- *
- * It also guarantees that Q is partitioned exactly the same way (and in same key-order) as A, so
- * their RDD should be able to zip successfully.
- */
- def dqrThin[K: ClassTag](drmA: DrmLike[K], checkRankDeficiency: Boolean = true): (DrmLike[K], Matrix) =
- DQR.dqrThin(drmA, checkRankDeficiency)
-
- /**
- * Distributed Stochastic Singular Value decomposition algorithm.
- *
- * @param drmA input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dssvd[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = DSSVD.dssvd(drmA, k, p, q)
-
- /**
- * Distributed Stochastic PCA decomposition algorithm. A logical reflow of the "SSVD-PCA options.pdf"
- * document of the MAHOUT-817.
- *
- * @param drmA input matrix A
- * @param k request SSVD rank
- * @param p oversampling parameter
- * @param q number of power iterations (hint: use either 0 or 1)
- * @return (U,V,s). Note that U, V are non-checkpointed matrices (i.e. one needs to actually use them
- * e.g. save them to hdfs in order to trigger their computation.
- */
- def dspca[K: ClassTag](drmA: DrmLike[K], k: Int, p: Int = 15, q: Int = 0):
- (DrmLike[K], DrmLike[Int], Vector) = DSPCA.dspca(drmA, k, p, q)
-
- /** Result for distributed ALS-type two-component factorization algorithms */
- type FactorizationResult[K] = ALS.Result[K]
-
- /** Result for distributed ALS-type two-component factorization algorithms, in-core matrices */
- type FactorizationResultInCore = ALS.InCoreResult
-
- /**
- * Run ALS.
- * <P>
- *
- * Example:
- *
- * <pre>
- * val (u,v,errors) = als(input, k).toTuple
- * </pre>
- *
- * ALS runs until (rmse[i-1]-rmse[i])/rmse[i-1] < convergenceThreshold, or i==maxIterations,
- * whichever earlier.
- * <P>
- *
- * @param drmA The input matrix
- * @param k required rank of decomposition (number of cols in U and V results)
- * @param convergenceThreshold stop sooner if (rmse[i-1] - rmse[i])/rmse[i - 1] is less than this
- * value. If <=0 then we won't compute RMSE and use convergence test.
- * @param lambda regularization rate
- * @param maxIterations maximum iterations to run regardless of convergence
- * @tparam K row key type of the input (100 is probably more than enough)
- * @return { @link org.apache.mahout.math.drm.decompositions.ALS.Result}
- */
- def dals[K: ClassTag](
- drmA: DrmLike[K],
- k: Int = 50,
- lambda: Double = 0.0,
- maxIterations: Int = 10,
- convergenceThreshold: Double = 0.10
- ): FactorizationResult[K] =
- ALS.dals(drmA, k, lambda, maxIterations, convergenceThreshold)
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
deleted file mode 100644
index b86e286..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/BCast.scala
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-/** Broadcast variable abstraction */
-trait BCast[T] extends java.io.Closeable {
- def value:T
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
deleted file mode 100644
index 3755f31..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CacheHint.scala
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-object CacheHint extends Enumeration {
-
- type CacheHint = Value
-
- val NONE,
- DISK_ONLY,
- DISK_ONLY_2,
- MEMORY_ONLY,
- MEMORY_ONLY_2,
- MEMORY_ONLY_SER,
- MEMORY_ONLY_SER_2,
- MEMORY_AND_DISK,
- MEMORY_AND_DISK_2,
- MEMORY_AND_DISK_SER,
- MEMORY_AND_DISK_SER_2 = Value
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
deleted file mode 100644
index 31f8097..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import org.apache.mahout.math.Matrix
-import org.apache.mahout.math.drm.CacheHint.CacheHint
-
-/**
- * Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
- * therefore collected or saved.
- *
- * @tparam K matrix key type (e.g. the keys of sequence files once persisted)
- */
-trait CheckpointedDrm[K] extends DrmLike[K] {
-
- def collect: Matrix
-
- def dfsWrite(path: String)
-
- val cacheHint: CacheHint
-
- /** If this checkpoint is already declared cached, uncache. */
- def uncache(): this.type
-
- /** changes the number of rows without touching the underlying data */
- def newRowCardinality(n: Int): CheckpointedDrm[K]
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
deleted file mode 100644
index 37cd981..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedOps.scala
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import org.apache.mahout.math._
-
-import org.apache.mahout.math.scalabindings.RLikeOps._
-
-/**
- * Additional experimental operations over CheckpointedDRM implementation. I will possibly move them up to
- * the DRMBase once they stabilize.
- *
- */
-class CheckpointedOps[K](val drm: CheckpointedDrm[K]) {
-
-
- /** Column sums. At this point this runs on checkpoint and collects in-core vector. */
- def colSums(): Vector = drm.context.colSums(drm)
-
- /** Column clounts. Counts the non-zero values. At this point this runs on checkpoint and collects in-core vector. */
- def numNonZeroElementsPerColumn(): Vector = drm.context.numNonZeroElementsPerColumn(drm)
-
- /** Column Means */
- def colMeans(): Vector = drm.context.colMeans(drm)
-
- /** Optional engine-specific all reduce tensor operation. */
- def allreduceBlock(bmf: BlockMapFunc2[K], rf: BlockReduceFunc = _ += _): Matrix =
-
- drm.context.allreduceBlock(drm, bmf, rf)
-
- /** Second norm */
- def norm():Double = drm.context.norm(drm)
-}
-
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
deleted file mode 100644
index e1833d8..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedContext.scala
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import java.io.Closeable
-
-/** Distributed context (a.k.a. distributed session handle) */
-trait DistributedContext extends Closeable {
-
- val engine: DistributedEngine
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
deleted file mode 100644
index c27e8dd..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ /dev/null
@@ -1,268 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import org.apache.mahout.math.indexeddataset._
-
-import logical._
-import org.apache.mahout.math._
-import scalabindings._
-import RLikeOps._
-import DistributedEngine._
-import org.apache.log4j.Logger
-
-import scala.reflect.ClassTag
-
-/** Abstraction of optimizer/distributed engine */
-trait DistributedEngine {
-
- /**
- * First optimization pass. Return physical plan that we can pass to exec(). This rewrite may
- * introduce logical constructs (including engine-specific ones) that user DSL cannot even produce
- * per se.
- * <P>
- *
- * A particular physical engine implementation may choose to either use the default rewrites or
- * build its own rewriting rules.
- * <P>
- */
- def optimizerRewrite[K: ClassTag](action: DrmLike[K]): DrmLike[K] = pass3(pass2(pass1(action)))
-
- /** Second optimizer pass. Translate previously rewritten logical pipeline into physical engine plan. */
- def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K]
-
- /** Engine-specific colSums implementation based on a checkpoint. */
- def colSums[K](drm: CheckpointedDrm[K]): Vector
-
- /** Optional engine-specific all reduce tensor operation. */
- def allreduceBlock[K](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf: BlockReduceFunc): Matrix
-
- /** Engine-specific numNonZeroElementsPerColumn implementation based on a checkpoint. */
- def numNonZeroElementsPerColumn[K](drm: CheckpointedDrm[K]): Vector
-
- /** Engine-specific colMeans implementation based on a checkpoint. */
- def colMeans[K](drm: CheckpointedDrm[K]): Vector
-
- def norm[K](drm: CheckpointedDrm[K]): Double
-
- /** Broadcast support */
- def drmBroadcast(v: Vector)(implicit dc: DistributedContext): BCast[Vector]
-
- /** Broadcast support */
- def drmBroadcast(m: Matrix)(implicit dc: DistributedContext): BCast[Matrix]
-
- /**
- * Load DRM from hdfs (as in Mahout DRM format).
- * <P/>
- * @param path The DFS path to load from
- * @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
- */
- def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
-
- /** Parallelize in-core matrix as the backend engine distributed matrix, using row ordinal indices as data set keys. */
- def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)(implicit sc: DistributedContext):
- CheckpointedDrm[Int]
-
- /** Parallelize in-core matrix as the backend engine distributed matrix, using row labels as a data set keys. */
- def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)(implicit sc: DistributedContext):
- CheckpointedDrm[String]
-
- /** This creates an empty DRM with specified number of partitions and cardinality. */
- def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)(implicit sc: DistributedContext):
- CheckpointedDrm[Int]
-
- /** Creates empty DRM with non-trivial height */
- def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)(implicit sc: DistributedContext):
- CheckpointedDrm[Long]
-
- /**
- * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
- * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
- */
- def drm2IntKeyed[K](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]])
-
- /**
- * (Optional) Sampling operation. Consistent with Spark semantics of the same.
- * @param drmX
- * @param fraction
- * @param replacement
- * @tparam K
- * @return
- */
- def drmSampleRows[K](drmX: DrmLike[K], fraction: Double, replacement: Boolean = false): DrmLike[K]
-
- def drmSampleKRows[K](drmX: DrmLike[K], numSamples:Int, replacement:Boolean = false) : Matrix
-
- /**
- * Load IndexedDataset from text delimited format.
- * @param src comma delimited URIs to read from
- * @param schema defines format of file(s)
- */
- def indexedDatasetDFSRead(src: String,
- schema: Schema = DefaultIndexedDatasetReadSchema,
- existingRowIDs: Option[BiDictionary] = None)
- (implicit sc: DistributedContext):
- IndexedDataset
-
- /**
- * Load IndexedDataset from text delimited format, one element per line
- * @param src comma delimited URIs to read from
- * @param schema defines format of file(s)
- */
- def indexedDatasetDFSReadElements(src: String,
- schema: Schema = DefaultIndexedDatasetElementReadSchema,
- existingRowIDs: Option[BiDictionary] = None)
- (implicit sc: DistributedContext):
- IndexedDataset
-
-}
-
-object DistributedEngine {
-
- private val log = Logger.getLogger(DistributedEngine.getClass)
-
- /** This is mostly multiplication operations rewrites */
- private def pass1[K](action: DrmLike[K]): DrmLike[K] = {
-
- action match {
-
- // Logical but previously had checkpoint attached to it already that has some caching policy to it
- case cpa: CheckpointAction[K] if cpa.cp.exists(_.cacheHint != CacheHint.NONE) ⇒ cpa.cp.get
-
- // self element-wise rewrite
- case OpAewB(a, b, op) if a == b => {
- op match {
- case "*" ⇒ OpAewUnaryFunc(pass1(a), (x) ⇒ x * x)
- case "/" ⇒ OpAewUnaryFunc(pass1(a), (x) ⇒ x / x)
- // Self "+" and "-" don't make a lot of sense, but we do include it for completeness.
- case "+" ⇒ OpAewUnaryFunc(pass1(a), 2.0 * _)
- case "-" ⇒ OpAewUnaryFunc(pass1(a), (_) ⇒ 0.0)
- case _ ⇒
- require(false, s"Unsupported operator $op")
- null
- }
- }
- case OpAB(OpAt(a), b) if a == b ⇒ OpAtA(pass1(a))
- case OpABAnyKey(OpAtAnyKey(a), b) if a == b ⇒ OpAtA(pass1(a))
-
- // A small rule change: Now that we have removed ClassTag at the %*% operation, it doesn't
- // match b[Int] case automatically any longer. So, we need to check and rewrite it dynamically
- // and re-run pass1 again on the obtained tree.
- case OpABAnyKey(a, b) if b.keyClassTag == ClassTag.Int ⇒ pass1(OpAB(a, b.asInstanceOf[DrmLike[Int]]))
- case OpAtAnyKey(a) if a.keyClassTag == ClassTag.Int ⇒ pass1(OpAt(a.asInstanceOf[DrmLike[Int]]))
-
- // For now, rewrite left-multiply via transpositions, i.e.
- // inCoreA %*% B = (B' %*% inCoreA')'
- case op@OpTimesLeftMatrix(a, b) ⇒
- OpAt(OpTimesRightMatrix(A = OpAt(pass1(b)), right = a.t))
-
- // Add vertical row index concatenation for rbind() on DrmLike[Int] fragments
- case op@OpRbind(a, b) if op.keyClassTag == ClassTag.Int ⇒
-
- // Make sure closure sees only local vals, not attributes. We need to do these ugly casts
- // around because compiler could not infer that K is the same as Int, based on if() above.
- val ma = safeToNonNegInt(a.nrow)
- val bAdjusted = new OpMapBlock[Int, Int](A = pass1(b.asInstanceOf[DrmLike[Int]]), bmf = {
- case (keys, block) ⇒ keys.map(_ + ma) → block
- }, identicallyPartitioned = false)
- val aAdjusted = a.asInstanceOf[DrmLike[Int]]
- OpRbind(pass1(aAdjusted), bAdjusted).asInstanceOf[DrmLike[K]]
-
- // Stop at checkpoints
- case cd: CheckpointedDrm[_] ⇒ action
-
- // For everything else we just pass-thru the operator arguments to optimizer
- case uop: AbstractUnaryOp[_, K] ⇒
- uop.A = pass1(uop.A)
- uop
-
- case bop: AbstractBinaryOp[_, _, K] ⇒
- bop.A = pass1(bop.A)
- bop.B = pass1(bop.B)
- bop
- }
- }
-
- /** This would remove stuff like A.t.t that previous step may have created */
- private def pass2[K](action: DrmLike[K]): DrmLike[K] = {
- action match {
-
- // Fusion of unary funcs into single, like 1 + x * x.
- // Since we repeating the pass over self after rewrite, we dont' need to descend into arguments
- // recursively here.
- case op1@OpAewUnaryFunc(op2@OpAewUnaryFunc(a, _, _), _, _) ⇒
- pass2(OpAewUnaryFuncFusion(a, op1 :: op2 :: Nil))
-
- // Fusion one step further, like 1 + 2 * x * x. All should be rewritten as one UnaryFuncFusion.
- // Since we repeating the pass over self after rewrite, we dont' need to descend into arguments
- // recursively here.
- case op@OpAewUnaryFuncFusion(op2@OpAewUnaryFunc(a, _, _), _) ⇒
- pass2(OpAewUnaryFuncFusion(a, op.ff :+ op2))
-
- // A.t.t => A
- case OpAt(top@OpAt(a)) ⇒ pass2(a)
-
- // Stop at checkpoints
- case cd: CheckpointedDrm[_] ⇒ action
-
- // For everything else we just pass-thru the operator arguments to optimizer
- case uop: AbstractUnaryOp[_, K] ⇒
- uop.A = pass2(uop.A)
- uop
- case bop: AbstractBinaryOp[_, _, K] ⇒
- bop.A = pass2(bop.A)
- bop.B = pass2(bop.B)
- bop
- }
- }
-
- /** Some further rewrites that are conditioned on A.t.t removal */
- private def pass3[K](action: DrmLike[K]): DrmLike[K] = {
- action match {
-
- // matrix products.
- case OpAB(a, OpAt(b)) ⇒ OpABt(pass3(a), pass3(b))
-
- // AtB cases that make sense.
- case OpAB(OpAt(a), b) if a.partitioningTag == b.partitioningTag ⇒ OpAtB(pass3(a), pass3(b))
- case OpABAnyKey(OpAtAnyKey(a), b) ⇒ OpAtB(pass3(a), pass3(b))
-
- // Need some cost to choose between the following.
-
- case OpAB(OpAt(a), b) ⇒ OpAtB(pass3(a), pass3(b))
- // case OpAB(OpAt(a), b) => OpAt(OpABt(OpAt(pass1(b)), pass1(a)))
- case OpAB(a, b) ⇒ OpABt(pass3(a), OpAt(pass3(b)))
-
- // Rewrite A'x
- case op@OpAx(op1@OpAt(a), x) ⇒ OpAtx(pass3(a), x)
-
- // Stop at checkpoints
- case cd: CheckpointedDrm[_] ⇒ action
-
- // For everything else we just pass-thru the operator arguments to optimizer
- case uop: AbstractUnaryOp[_, K] ⇒
- uop.A = pass3(uop.A)
- uop
- case bop: AbstractBinaryOp[_, _, K] ⇒
- bop.A = pass3(bop.A)
- bop.B = pass3(bop.B)
- bop
- }
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
deleted file mode 100644
index de03776..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmDoubleScalarOps.scala
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import org.apache.mahout.math.drm.RLikeDrmOps._
-import org.apache.mahout.math.drm.logical.OpCbindScalar
-
-import scala.reflect.ClassTag
-
-class DrmDoubleScalarOps(val x:Double) extends AnyVal{
-
- def +[K:ClassTag](that:DrmLike[K]) = that + x
-
- def *[K:ClassTag](that:DrmLike[K]) = that * x
-
- def -[K:ClassTag](that:DrmLike[K]) = x -: that
-
- def /[K:ClassTag](that:DrmLike[K]) = x /: that
-
- def cbind[K: ClassTag](that: DrmLike[K]) = OpCbindScalar(A = that, x = x, leftBind = true)
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
deleted file mode 100644
index 23f5fc6..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import scala.reflect.ClassTag
-
-/**
- *
- * Basic DRM trait.
- *
- * Since we already call the package "sparkbindings", I will not use stem "spark" with classes in
- * this package. Spark backing is already implied.
- *
- */
-trait DrmLike[K] {
-
- protected[mahout] def partitioningTag: Long
-
- protected[mahout] def canHaveMissingRows: Boolean
-
- /**
- * Distributed context, can be implicitly converted to operations on [[org.apache.mahout.math.drm.
- * DistributedEngine]].
- */
- val context:DistributedContext
-
- /** R-like syntax for number of rows. */
- def nrow: Long
-
- /** R-like syntax for number of columns */
- def ncol: Int
-
- /**
- * Explicit extraction of key class Tag since traits don't support context bound access; but actual
- * implementation knows it
- */
- def keyClassTag: ClassTag[K]
-
- /**
- * Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
- * and writing down Spark graph lineage since last checkpointed DRM.
- */
- def checkpoint(cacheHint: CacheHint.CacheHint = CacheHint.MEMORY_ONLY): CheckpointedDrm[K]
-
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
deleted file mode 100644
index 43b4f56..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLikeOps.scala
+++ /dev/null
@@ -1,140 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import scala.reflect.ClassTag
-import org.apache.mahout.math.scalabindings._
-import org.apache.mahout.math.drm.logical.{OpAewUnaryFunc, OpPar, OpMapBlock, OpRowRange}
-
-/** Common Drm ops */
-class DrmLikeOps[K](protected[drm] val drm: DrmLike[K]) {
-
- /**
- * Parallelism adjustments. <P/>
- *
- * Change only one of parameters from default value to choose new parallelism adjustment strategy.
- * <P/>
- *
- * E.g. use
- * <pre>
- * drmA.par(auto = true)
- * </pre>
- * to use automatic parallelism adjustment.
- * <P/>
- *
- * Parallelism here in API is fairly abstract concept, and actual value interpretation is left for
- * a particular backend strategy. However, it is usually equivalent to number of map tasks or data
- * splits.
- * <P/>
- *
- * @param min If changed from default, ensures the product has at least that much parallelism.
- * @param exact if changed from default, ensures the pipeline product has exactly that much
- * parallelism.
- * @param auto If changed from default, engine-specific automatic parallelism adjustment strategy
- * is applied.
- */
- def par(min: Int = -1, exact: Int = -1, auto: Boolean = false) = {
- require(min > 0 || exact > 0 || auto, "Invalid argument")
- OpPar(drm, minSplits = min, exactSplits = exact)
- }
-
- /**
- * Map matrix block-wise vertically. Blocks of the new matrix can be modified original block
- * matrices; or they could be completely new matrices with new keyset. In the latter case, output
- * matrix width must be specified with <code>ncol</code> parameter.<P>
- *
- * New block heights must be of the same height as the original geometry.<P>
- *
- * @param ncol new matrix' width (only needed if width changes).
- * @param bmf
- * @tparam R
- * @return
- */
- def mapBlock[R: ClassTag](ncol: Int = -1, identicallyPartitioned: Boolean = true)
- (bmf: BlockMapFunc[K, R]): DrmLike[R] =
- new OpMapBlock[K, R](
- A = drm,
- bmf = bmf,
- _ncol = ncol,
- identicallyPartitioned = identicallyPartitioned
- )
-
- /**
- * Slicing the DRM. Should eventually work just like in-core drm (e.g. A(0 until 5, 5 until 15)).<P>
- *
- * The all-range is denoted by '::', e.g.: A(::, 0 until 5).<P>
- *
- * Row range is currently unsupported except for the all-range. When it will be fully supported,
- * the input must be Int-keyed, i.e. of DrmLike[Int] type for non-all-range specifications.
- *
- * @param rowRange Row range. This must be '::' (all-range) unless matrix rows are keyed by Int key.
- * @param colRange col range. Must be a sub-range of <code>0 until ncol</code>. '::' denotes all-range.
- */
- def apply(rowRange: Range, colRange: Range): DrmLike[K] = {
-
- import RLikeDrmOps._
- import RLikeOps._
-
- implicit val ktag = drm.keyClassTag
-
- val rowSrc: DrmLike[K] = if (rowRange != ::) {
-
- if (ClassTag.Int == ktag) {
-
- assert(rowRange.head >= 0 && rowRange.last < drm.nrow, "rows range out of range")
- val intKeyed = drm.asInstanceOf[DrmLike[Int]]
-
- new OpRowRange(A = intKeyed, rowRange = rowRange).asInstanceOf[DrmLike[K]]
-
- } else throw new IllegalArgumentException("non-all row range is only supported for Int-keyed DRMs.")
-
- } else drm
-
- if (colRange != ::) {
-
- assert(colRange.head >= 0 && colRange.last < drm.ncol, "col range out of range")
-
- // Use mapBlock operator to do in-core subranging.
- rowSrc.mapBlock(ncol = colRange.length)({
- case (keys, block) => keys -> block(::, colRange)
- })
-
- } else rowSrc
- }
-
- /**
- * Apply a function element-wise.
- *
- * @param f element-wise function
- * @param evalZeros Do we have to process zero elements? true, false, auto: if auto, we will test
- * the supplied function for `f(0) != 0`, and depending on the result, will
- * decide if we want evaluation for zero elements. WARNING: the AUTO setting
- * may not always work correctly for functions that are meant to run in a specific
- * backend context, or non-deterministic functions, such as {-1,0,1} random
- * generators.
- * @return new DRM with the element-wise function applied.
- */
- def apply(f: Double ⇒ Double, evalZeros: AutoBooleanEnum.T = AutoBooleanEnum.AUTO) = {
- val ezeros = evalZeros match {
- case AutoBooleanEnum.TRUE ⇒ true
- case AutoBooleanEnum.FALSE ⇒ false
- case AutoBooleanEnum.AUTO ⇒ f(0) != 0
- }
- new OpAewUnaryFunc[K](drm, f, ezeros)
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/e0573de3/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
deleted file mode 100644
index 8bea741..0000000
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/RLikeDrmOps.scala
+++ /dev/null
@@ -1,172 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.math.drm
-
-import scala.reflect.ClassTag
-import collection._
-import JavaConversions._
-import org.apache.mahout.math.{Vector, Matrix}
-import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-class RLikeDrmOps[K](drm: DrmLike[K]) extends DrmLikeOps[K](drm) {
-
- import RLikeDrmOps._
- import org.apache.mahout.math.scalabindings._
-
- def +(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "+")
-
- def -(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "-")
-
- def *(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "*")
-
- def /(that: DrmLike[K]): DrmLike[K] = OpAewB[K](A = this, B = that, op = "/")
-
- def +(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ + that, evalZeros = true)
-
- def +:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that + _, evalZeros = true)
-
- def -(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ - that, evalZeros = true)
-
- def -:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that - _, evalZeros = true)
-
- def *(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ * that)
-
- def *:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that * _)
-
- def ^(that: Double): DrmLike[K] = that match {
- // Special handling of x ^2 and x ^ 0.5: we want consistent handling of x ^ 2 and x * x since
- // pow(x,2) function return results different from x * x; but much of the code uses this
- // interchangeably. Not having this done will create things like NaN entries on main diagonal
- // of a distance matrix.
- case 2.0 ⇒ OpAewUnaryFunc[K](A = this, f = x ⇒ x * x)
- case 0.5 ⇒ OpAewUnaryFunc[K](A = this, f = math.sqrt _)
- case _ ⇒ OpAewUnaryFunc[K](A = this, f = math.pow(_, that))
- }
-
- def /(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = _ / that, evalZeros = that == 0.0)
-
- def /:(that: Double): DrmLike[K] = OpAewUnaryFunc[K](A = this, f = that / _, evalZeros = true)
-
- def :%*%[B](that: DrmLike[B]): DrmLike[K] = OpABAnyKey[B,K](A = this.drm, B=that)
-
- def %*%[B](that: DrmLike[B]): DrmLike[K] = this :%*% that
-
- def :%*%(that: Matrix): DrmLike[K] = OpTimesRightMatrix[K](A = this.drm, right = that)
-
- def %*%(that: Matrix): DrmLike[K] = this :%*% that
-
- def :%*%(that: Vector): DrmLike[K] = OpAx(A = this.drm, x = that)
-
- def %*%(that: Vector): DrmLike[K] = :%*%(that)
-
- def t: DrmLike[Int] = OpAtAnyKey(A = drm)
-
- def cbind(that: DrmLike[K]): DrmLike[K] = OpCbind(A = this.drm, B = that)
-
- def cbind(that: Double): DrmLike[K] = OpCbindScalar(A = this.drm, x = that, leftBind = false)
-
- def rbind(that: DrmLike[K]): DrmLike[K] = OpRbind(A = this.drm, B = that)
-
- /**
- * `rowSums` method for non-int keyed matrices.
- *
- * Slight problem here is the limitation of in-memory representation of Colt's Matrix, which can
- * only have String row labels. Therefore, internally we do ".toString()" call on each key object,
- * and then put it into [[Matrix]] row label bindings, at which point they are coerced to be Strings.
- *
- * This is obviously a suboptimal behavior, so as TODO we have here future enhancements of `collect'.
- *
- * @return map of row keys into row sums, front-end collected.
- */
- def rowSumsMap(): Map[String, Double] = {
-
- implicit val ktag = drm.keyClassTag
-
- val m = drm.mapBlock(ncol = 1) { case (keys, block) =>
- keys -> dense(block.rowSums).t
- }.collect
- m.getRowLabelBindings.map { case (key, idx) => key -> m(idx, 0)}
- }
-}
-
-class RLikeDrmIntOps(drm: DrmLike[Int]) extends RLikeDrmOps[Int](drm) {
-
- import org.apache.mahout.math._
- import scalabindings._
- import RLikeDrmOps._
-
- override def t: DrmLike[Int] = OpAt(A = drm)
-
- def %*%:[K: ClassTag](that: DrmLike[K]): DrmLike[K] = OpAB[K](A = that, B = this.drm)
-
- def %*%:(that: Matrix): DrmLike[Int] = OpTimesLeftMatrix(left = that, A = this.drm)
-
- /** Row sums. This is of course applicable to Int-keyed distributed matrices only. */
- def rowSums(): Vector = {
- drm.mapBlock(ncol = 1) { case (keys, block) =>
- // Collect block-wise rowsums and output them as one-column matrix.
- keys -> dense(block.rowSums).t
- }
- .collect(::, 0)
- }
-
- /** Counts the non-zeros elements in each row returning a vector of the counts */
- def numNonZeroElementsPerRow(): Vector = {
- drm.mapBlock(ncol = 1) { case (keys, block) =>
- // Collect block-wise row non-zero counts and output them as a one-column matrix.
- keys -> dense(block.numNonZeroElementsPerRow).t
- }
- .collect(::, 0)
- }
-
- /** Row means */
- def rowMeans(): Vector = {
- drm.mapBlock(ncol = 1) { case (keys, block) =>
- // Collect block-wise row means and output them as one-column matrix.
- keys -> dense(block.rowMeans).t
- }
- .collect(::, 0)
- }
-
- /** Return diagonal vector */
- def diagv: Vector = {
- require(drm.ncol == drm.nrow, "Must be square to extract diagonal")
- drm.mapBlock(ncol = 1) { case (keys, block) =>
- keys -> dense(for (r <- block.view) yield r(keys(r.index))).t
- }
- .collect(::, 0)
- }
-
-}
-
-object RLikeDrmOps {
-
- implicit def double2ScalarOps(x: Double) = new DrmDoubleScalarOps(x)
-
- implicit def drmInt2RLikeOps(drm: DrmLike[Int]): RLikeDrmIntOps = new RLikeDrmIntOps(drm)
-
- implicit def drm2RLikeOps[K](drm: DrmLike[K]): RLikeDrmOps[K] = new RLikeDrmOps[K](drm)
-
- implicit def rlikeOps2Drm[K](ops: RLikeDrmOps[K]): DrmLike[K] = ops.drm
-
- implicit def ops2Drm[K](ops: DrmLikeOps[K]): DrmLike[K] = ops.drm
-
- implicit def drm2cpops[K](drm: DrmLike[K]): CheckpointedOps[K] = new CheckpointedOps(drm)
-}