You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by dl...@apache.org on 2015/06/11 02:09:09 UTC
[2/4] mahout git commit: MAHOUT-1660 MAHOUT-1713 MAHOUT-1714
MAHOUT-1715 MAHOUT-1716 MAHOUT-1717 MAHOUT-1718 MAHOUT-1719 MAHOUT-1720
MAHOUT-1721 MAHOUT-1722 MAHOUT-1723 MAHOUT-1724 MAHOUT-1725 MAHOUT-1726
MAHOUT-1727 MAHOUT-1728 MAHOUT-1729 MAHOUT-1730 M
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 595cd66..41e966b 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -24,51 +24,59 @@ import org.apache.mahout.sparkbindings.indexeddataset.IndexedDatasetSpark
import scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm.logical._
-import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
+import org.apache.mahout.sparkbindings.drm.{cpDrmGeneric2DrmRddInput, CheckpointedDrmSpark, DrmRddInput}
import org.apache.mahout.math._
+import scala.Predef
import scala.reflect.ClassTag
+import scala.reflect.classTag
import org.apache.spark.storage.StorageLevel
import org.apache.mahout.sparkbindings.blas._
import org.apache.hadoop.io._
-import scala.Some
-import scala.collection.JavaConversions._
+import collection._
+import JavaConversions._
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.spark.rdd.RDD
import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
+
/** Spark-specific non-drm-method operations */
object SparkEngine extends DistributedEngine {
// By default, use Hadoop 1 utils
var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
- def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+ def colSums[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
val n = drm.ncol
drm.rdd
+
// Throw away keys
.map(_._2)
+
// Fold() doesn't work with kryo still. So work around it.
- .mapPartitions(iter => {
- val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) => acc += v)
+ .mapPartitions(iter ⇒ {
+ val acc = ((new DenseVector(n): Vector) /: iter)((acc, v) ⇒ acc += v)
Iterator(acc)
})
+
// Since we preallocated new accumulator vector per partition, this must not cause any side
// effects now.
.reduce(_ += _)
}
- def numNonZeroElementsPerColumn[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
+ def numNonZeroElementsPerColumn[K: ClassTag](drm: CheckpointedDrm[K]): Vector = {
val n = drm.ncol
drm.rdd
+
// Throw away keys
.map(_._2)
+
// Fold() doesn't work with kryo still. So work around it.
- .mapPartitions(iter => {
- val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) =>
- v.nonZeroes().foreach { elem => acc(elem.index) += 1 }
+ .mapPartitions(iter ⇒ {
+ val acc = ((new DenseVector(n): Vector) /: iter) { (acc, v) ⇒
+ v.nonZeroes().foreach { elem ⇒ acc(elem.index) += 1}
acc
}
Iterator(acc)
@@ -79,17 +87,25 @@ object SparkEngine extends DistributedEngine {
}
/** Engine-specific colMeans implementation based on a checkpoint. */
- override def colMeans[K:ClassTag](drm: CheckpointedDrm[K]): Vector =
+ override def colMeans[K: ClassTag](drm: CheckpointedDrm[K]): Vector =
if (drm.nrow == 0) drm.colSums() else drm.colSums() /= drm.nrow
override def norm[K: ClassTag](drm: CheckpointedDrm[K]): Double =
drm.rdd
- // Compute sum of squares of each vector
- .map {
- case (key, v) => v dot v
+ // Compute sum of squares of each vector
+ .map {
+ case (key, v) ⇒ v dot v
}
- .reduce(_ + _)
+ .reduce(_ + _)
+
+ /** Optional engine-specific all reduce tensor operation. */
+ override def allreduceBlock[K: ClassTag](drm: CheckpointedDrm[K], bmf: BlockMapFunc2[K], rf:
+ BlockReduceFunc): Matrix = {
+
+ import drm._
+ drm.toBlockifiedDrmRdd(ncol = drm.ncol).map(bmf(_)).reduce(rf)
+ }
/**
* Perform default expression rewrite. Return physical plan that we can pass to exec(). <P>
@@ -104,10 +120,10 @@ object SparkEngine extends DistributedEngine {
def toPhysical[K: ClassTag](plan: DrmLike[K], ch: CacheHint.CacheHint): CheckpointedDrm[K] = {
// Spark-specific Physical Plan translation.
- val rdd = tr2phys(plan)
+ val rddInput = tr2phys(plan)
val newcp = new CheckpointedDrmSpark(
- rdd = rdd,
+ rddInput = rddInput,
_nrow = plan.nrow,
_ncol = plan.ncol,
_cacheStorageLevel = cacheHint2Spark(ch),
@@ -131,7 +147,13 @@ object SparkEngine extends DistributedEngine {
*
* @return DRM[Any] where Any is automatically translated to value type
*/
- def drmDfsRead (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+ def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
+
+ // Require that context is actually Spark context.
+ require(sc.isInstanceOf[SparkDistributedContext], "Supplied context must be for the Spark backend.")
+
+ // Extract spark context -- we need it for some operations.
+ implicit val ssc = sc.asInstanceOf[SparkDistributedContext].sc
val drmMetadata = hdfsUtils.readDrmHeader(path)
val k2vFunc = drmMetadata.keyW2ValFunc
@@ -140,8 +162,8 @@ object SparkEngine extends DistributedEngine {
// Hadoop we must do it right after read operation).
val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)
- // Immediately convert keys and value writables into value types.
- .map { case (wKey, wVec) => k2vFunc(wKey) -> wVec.get()}
+ // Immediately convert keys and value writables into value types.
+ .map { case (wKey, wVec) ⇒ k2vFunc(wKey) -> wVec.get()}
// Wrap into a DRM type with correct matrix row key class tag evident.
drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
@@ -149,67 +171,141 @@ object SparkEngine extends DistributedEngine {
/** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
- (implicit sc: DistributedContext)
+ (implicit sc: DistributedContext)
: CheckpointedDrm[Int] = {
- new CheckpointedDrmSpark(rdd = parallelizeInCore(m, numPartitions))
+ new CheckpointedDrmSpark(rddInput = parallelizeInCore(m, numPartitions), _nrow = m.nrow, _ncol = m.ncol)
}
private[sparkbindings] def parallelizeInCore(m: Matrix, numPartitions: Int = 1)
- (implicit sc: DistributedContext): DrmRdd[Int] = {
+ (implicit sc: DistributedContext): DrmRdd[Int] = {
- val p = (0 until m.nrow).map(i => i -> m(i, ::))
+ val p = (0 until m.nrow).map(i => i → m(i, ::))
sc.parallelize(p, numPartitions)
}
/** Parallelize in-core matrix as spark distributed matrix, using row labels as a data set keys. */
def drmParallelizeWithRowLabels(m: Matrix, numPartitions: Int = 1)
- (implicit sc: DistributedContext)
+ (implicit sc: DistributedContext)
: CheckpointedDrm[String] = {
val rb = m.getRowLabelBindings
- val p = for (i: String <- rb.keySet().toIndexedSeq) yield i -> m(rb(i), ::)
+ val p = for (i: String ← rb.keySet().toIndexedSeq) yield i → m(rb(i), ::)
- new CheckpointedDrmSpark(rdd = sc.parallelize(p, numPartitions))
+ new CheckpointedDrmSpark(rddInput = sc.parallelize(p, numPartitions), _nrow = m.nrow, _ncol = m.ncol)
}
/** This creates an empty DRM with specified number of partitions and cardinality. */
def drmParallelizeEmpty(nrow: Int, ncol: Int, numPartitions: Int = 10)
- (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
- val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+ (implicit sc: DistributedContext): CheckpointedDrm[Int] = {
+ val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part ⇒ {
val partNRow = (nrow - 1) / numPartitions + 1
val partStart = partNRow * part
val partEnd = Math.min(partStart + partNRow, nrow)
- for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ for (i ← partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
})
new CheckpointedDrmSpark[Int](rdd, nrow, ncol)
}
def drmParallelizeEmptyLong(nrow: Long, ncol: Int, numPartitions: Int = 10)
- (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
- val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part => {
+ (implicit sc: DistributedContext): CheckpointedDrm[Long] = {
+ val rdd = sc.parallelize(0 to numPartitions, numPartitions).flatMap(part ⇒ {
val partNRow = (nrow - 1) / numPartitions + 1
val partStart = partNRow * part
val partEnd = Math.min(partStart + partNRow, nrow)
- for (i <- partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
+ for (i ← partStart until partEnd) yield (i, new RandomAccessSparseVector(ncol): Vector)
})
new CheckpointedDrmSpark[Long](rdd, nrow, ncol)
}
+ /**
+ * Convert non-int-keyed matrix to an int-keyed, computing optionally mapping from old keys
+ * to row indices in the new one. The mapping, if requested, is returned as a 1-column matrix.
+ */
+ override def drm2IntKeyed[K: ClassTag](drmX: DrmLike[K], computeMap: Boolean = false): (DrmLike[Int], Option[DrmLike[K]]) = {
+ if (classTag[K] == ClassTag.Int) {
+ drmX.asInstanceOf[DrmLike[Int]] → None
+ } else {
+
+ val drmXcp = drmX.checkpoint(CacheHint.MEMORY_ONLY)
+ val ncol = drmXcp.asInstanceOf[CheckpointedDrmSpark[K]]._ncol
+ val nrow = drmXcp.asInstanceOf[CheckpointedDrmSpark[K]]._nrow
+
+ // Compute sequential int key numbering.
+ val (intRdd, keyMap) = blas.rekeySeqInts(rdd = drmXcp.rdd, computeMap = computeMap)
+
+ // Convert computed key mapping to a matrix.
+ val mxKeyMap = keyMap.map { rdd =>
+ drmWrap(rdd = rdd.map { case (key, ordinal) ⇒ key → (dvec(ordinal):Vector)}, ncol = 1, nrow = nrow)
+ }
+
+
+ drmWrap(rdd = intRdd, ncol = ncol) → mxKeyMap
+ }
+
+ }
+
+
+ /**
+ * (Optional) Sampling operation. Consistent with Spark semantics of the same.
+ * @param drmX
+ * @param fraction
+ * @param replacement
+ * @tparam K
+ * @return
+ */
+ override def drmSampleRows[K: ClassTag](drmX: DrmLike[K], fraction: Double, replacement: Boolean): DrmLike[K] = {
+
+ // We do want to take ncol if already computed, if not, then we don't want to trigger computation
+ // here.
+ val ncol = drmX match {
+ case cp: CheckpointedDrmSpark[K] ⇒ cp._ncol
+ case _ ⇒ -1
+ }
+ val sample = drmX.rdd.sample(withReplacement = replacement, fraction = fraction)
+ if (classTag[K] != ClassTag.Int) return drmWrap(sample, ncol = ncol)
+
+ // K == Int: Int-keyed sample. rebase int counts.
+ drmWrap(rdd = blas.rekeySeqInts(rdd = sample, computeMap = false)._1, ncol = ncol).asInstanceOf[DrmLike[K]]
+ }
+
+
+ override def drmSampleKRows[K: ClassTag](drmX: DrmLike[K], numSamples: Int, replacement: Boolean): Matrix = {
+
+ val ncol = drmX match {
+ case cp: CheckpointedDrmSpark[K] ⇒ cp._ncol
+ case _ ⇒ -1
+ }
+
+ // I think as of the time of this writing, takeSample() in Spark is biased. It is not a true
+ // hypergeometric sampler. But it is faster than a true hypergeometric/categorical samplers
+ // would be.
+ val sample = drmX.rdd.takeSample(withReplacement = replacement, num = numSamples)
+ val isSparse = sample.exists { case (_, vec) ⇒ !vec.isDense }
+
+ val vectors = sample.map(_._2)
+ val labels = sample.view.zipWithIndex.map { case ((key, _), idx) ⇒ key.toString → (idx:Integer) }.toMap
+
+ val mx:Matrix = if (isSparse) sparse(vectors:_*) else dense(vectors)
+ mx.setRowLabelBindings(labels)
+
+ mx
+ }
+
private[mahout] def cacheHint2Spark(cacheHint: CacheHint.CacheHint): StorageLevel = cacheHint match {
- case CacheHint.NONE => StorageLevel.NONE
- case CacheHint.DISK_ONLY => StorageLevel.DISK_ONLY
- case CacheHint.DISK_ONLY_2 => StorageLevel.DISK_ONLY_2
- case CacheHint.MEMORY_ONLY => StorageLevel.MEMORY_ONLY
- case CacheHint.MEMORY_ONLY_2 => StorageLevel.MEMORY_ONLY_2
- case CacheHint.MEMORY_ONLY_SER => StorageLevel.MEMORY_ONLY_SER
- case CacheHint.MEMORY_ONLY_SER_2 => StorageLevel.MEMORY_ONLY_SER_2
- case CacheHint.MEMORY_AND_DISK => StorageLevel.MEMORY_AND_DISK
- case CacheHint.MEMORY_AND_DISK_2 => StorageLevel.MEMORY_AND_DISK_2
- case CacheHint.MEMORY_AND_DISK_SER => StorageLevel.MEMORY_AND_DISK_SER
- case CacheHint.MEMORY_AND_DISK_SER_2 => StorageLevel.MEMORY_AND_DISK_SER_2
+ case CacheHint.NONE ⇒ StorageLevel.NONE
+ case CacheHint.DISK_ONLY ⇒ StorageLevel.DISK_ONLY
+ case CacheHint.DISK_ONLY_2 ⇒ StorageLevel.DISK_ONLY_2
+ case CacheHint.MEMORY_ONLY ⇒ StorageLevel.MEMORY_ONLY
+ case CacheHint.MEMORY_ONLY_2 ⇒ StorageLevel.MEMORY_ONLY_2
+ case CacheHint.MEMORY_ONLY_SER ⇒ StorageLevel.MEMORY_ONLY_SER
+ case CacheHint.MEMORY_ONLY_SER_2 ⇒ StorageLevel.MEMORY_ONLY_SER_2
+ case CacheHint.MEMORY_AND_DISK ⇒ StorageLevel.MEMORY_AND_DISK
+ case CacheHint.MEMORY_AND_DISK_2 ⇒ StorageLevel.MEMORY_AND_DISK_2
+ case CacheHint.MEMORY_AND_DISK_SER ⇒ StorageLevel.MEMORY_AND_DISK_SER
+ case CacheHint.MEMORY_AND_DISK_SER_2 ⇒ StorageLevel.MEMORY_AND_DISK_SER_2
}
/** Translate previously optimized physical plan */
@@ -221,31 +317,32 @@ object SparkEngine extends DistributedEngine {
// If there are any such cases, they must go away in pass1. If they were not, then it wasn't
// the A'A case but actual transposition intent which should be removed from consideration
// (we cannot do actual flip for non-int-keyed arguments)
- case OpAtAnyKey(_) =>
+ case OpAtAnyKey(_) ⇒
throw new IllegalArgumentException("\"A\" must be Int-keyed in this A.t expression.")
- case op@OpAt(a) => At.at(op, tr2phys(a)(op.classTagA))
- case op@OpABt(a, b) => ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpAtB(a, b) => AtB.atb_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB),
- zippable = a.partitioningTag == b.partitioningTag)
- case op@OpAtA(a) => AtA.at_a(op, tr2phys(a)(op.classTagA))
- case op@OpAx(a, x) => Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
- case op@OpAtx(a, x) => Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
- case op@OpAewB(a, b, opId) => AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpCbind(a, b) => CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpRbind(a, b) => RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
- case op@OpAewScalar(a, s, _) => AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
- case op@OpRowRange(a, _) => Slicing.rowRange(op, tr2phys(a)(op.classTagA))
- case op@OpTimesRightMatrix(a, _) => AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
+ case op@OpAt(a) ⇒ At.at(op, tr2phys(a)(op.classTagA))
+ case op@OpABt(a, b) ⇒ ABt.abt(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAtB(a, b) ⇒ AtB.atb(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAtA(a) ⇒ AtA.at_a(op, tr2phys(a)(op.classTagA))
+ case op@OpAx(a, x) ⇒ Ax.ax_with_broadcast(op, tr2phys(a)(op.classTagA))
+ case op@OpAtx(a, x) ⇒ Ax.atx_with_broadcast(op, tr2phys(a)(op.classTagA))
+ case op@OpAewUnaryFunc(a, _, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
+ case op@OpAewUnaryFuncFusion(a, _) ⇒ AewB.a_ew_func(op, tr2phys(a)(op.classTagA))
+ case op@OpAewB(a, b, opId) ⇒ AewB.a_ew_b(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpCbind(a, b) ⇒ CbindAB.cbindAB_nograph(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpCbindScalar(a, _, _) ⇒ CbindAB.cbindAScalar(op, tr2phys(a)(op.classTagA))
+ case op@OpRbind(a, b) ⇒ RbindAB.rbindAB(op, tr2phys(a)(op.classTagA), tr2phys(b)(op.classTagB))
+ case op@OpAewScalar(a, s, _) ⇒ AewB.a_ew_scalar(op, tr2phys(a)(op.classTagA), s)
+ case op@OpRowRange(a, _) ⇒ Slicing.rowRange(op, tr2phys(a)(op.classTagA))
+ case op@OpTimesRightMatrix(a, _) ⇒ AinCoreB.rightMultiply(op, tr2phys(a)(op.classTagA))
// Custom operators, we just execute them
- case blockOp: OpMapBlock[K, _] => MapBlock.exec(
+ case blockOp: OpMapBlock[K, _] ⇒ MapBlock.exec(
src = tr2phys(blockOp.A)(blockOp.classTagA),
- ncol = blockOp.ncol,
- bmf = blockOp.bmf
+ operator = blockOp
)
- case op@OpPar(a,_,_) => Par.exec(op,tr2phys(a)(op.classTagA))
- case cp: CheckpointedDrm[K] => new DrmRddInput[K](rowWiseSrc = Some((cp.ncol, cp.rdd)))
- case _ => throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
- .format(oper))
+ case op@OpPar(a, _, _) ⇒ Par.exec(op, tr2phys(a)(op.classTagA))
+ case cp: CheckpointedDrm[K] ⇒ cp.rdd: DrmRddInput[K]
+ case _ ⇒ throw new IllegalArgumentException("Internal:Optimizer has no exec policy for operator %s."
+ .format(oper))
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
index 1e3f286..11e2bad 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/ABt.scala
@@ -19,16 +19,23 @@ package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.math.scalabindings._
import RLikeOps._
+import org.apache.spark.rdd.RDD
import scala.reflect.ClassTag
import org.apache.mahout.sparkbindings._
-import drm._
-import org.apache.mahout.math.{Matrix, SparseRowMatrix}
+import org.apache.mahout.math.drm.BlockifiedDrmTuple
+import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.math.{SparseMatrix, Matrix, SparseRowMatrix}
import org.apache.spark.SparkContext._
import org.apache.mahout.math.drm.logical.OpABt
+import org.apache.mahout.logging._
+
+import scala.tools.nsc.io.Pickler.TildeDecorator
/** Contains RDD plans for ABt operator */
object ABt {
+ private final implicit val log = getLog(ABt.getClass)
+
/**
* General entry point for AB' operator.
*
@@ -40,8 +47,11 @@ object ABt {
def abt[K: ClassTag](
operator: OpABt[K],
srcA: DrmRddInput[K],
- srcB: DrmRddInput[Int]): DrmRddInput[K] =
+ srcB: DrmRddInput[Int]): DrmRddInput[K] = {
+
+ debug("operator AB'(Spark)")
abt_nograph(operator, srcA, srcB)
+ }
/**
* Computes AB' without GraphX.
@@ -63,7 +73,146 @@ object ABt {
srcB: DrmRddInput[Int]): DrmRddInput[K] = {
// Blockify everything.
- val blocksA = srcA.toBlockifiedDrmRdd()
+ val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+
+ val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+
+ val prodNCol = operator.ncol
+ val prodNRow = operator.nrow
+ // We are actually computing AB' here.
+ val numProductPartitions = estimateProductPartitions(anrow = prodNRow, ancol = operator.A.ncol,
+ bncol = prodNCol, aparts = blocksA.partitions.size, bparts = blocksB.partitions.size)
+
+ debug(
+ s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}."+
+ s"A=${operator.A.nrow}x${operator.A.ncol}, B=${operator.B.nrow}x${operator.B.ncol},AB'=${prodNRow}x$prodNCol."
+ )
+
+ // blockwise multimplication function
+ def mmulFunc(tupleA: BlockifiedDrmTuple[K], tupleB: BlockifiedDrmTuple[Int]): (Array[K], Array[Int], Matrix) = {
+ val (keysA, blockA) = tupleA
+ val (keysB, blockB) = tupleB
+
+ var ms = traceDo(System.currentTimeMillis())
+
+ // We need to send keysB to the aggregator in order to know which columns are being updated.
+ val result = (keysA, keysB, (blockA %*% blockB.t))
+
+ ms = traceDo(System.currentTimeMillis() - ms.get)
+ trace(
+ s"block multiplication of(${blockA.nrow}x${blockA.ncol} x ${blockB.ncol}x${blockB.nrow} is completed in $ms " +
+ "ms.")
+ trace(s"block multiplication types: blockA: ${blockA.getClass.getName}(${blockA.t.getClass.getName}); " +
+ s"blockB: ${blockB.getClass.getName}.")
+
+ result
+ }
+
+ val blockwiseMmulRdd =
+
+ // Combine blocks pairwise.
+ pairwiseApply(blocksA, blocksB, mmulFunc _)
+
+ // Now reduce proper product blocks.
+ .combineByKey(
+
+ // Empty combiner += value
+ createCombiner = (t: (Array[K], Array[Int], Matrix)) => {
+ val (rowKeys, colKeys, block) = t
+ val comb = new SparseMatrix(prodNCol, block.nrow).t
+
+ for ((col, i) <- colKeys.zipWithIndex) comb(::, col) := block(::, i)
+ rowKeys -> comb
+ },
+
+ // Combiner += value
+ mergeValue = (comb: (Array[K], Matrix), value: (Array[K], Array[Int], Matrix)) => {
+ val (rowKeys, c) = comb
+ val (_, colKeys, block) = value
+ for ((col, i) <- colKeys.zipWithIndex) c(::, col) := block(::, i)
+ comb
+ },
+
+ // Combiner + Combiner
+ mergeCombiners = (comb1: (Array[K], Matrix), comb2: (Array[K], Matrix)) => {
+ comb1._2 += comb2._2
+ comb1
+ },
+
+ numPartitions = blocksA.partitions.size max blocksB.partitions.size
+ )
+
+
+ // Created BlockifiedRDD-compatible structure.
+ val blockifiedRdd = blockwiseMmulRdd
+
+ // throw away A-partition #
+ .map{case (_,tuple) => tuple}
+
+ val numPartsResult = blockifiedRdd.partitions.size
+
+ // See if we need to rebalance away from A granularity.
+ if (numPartsResult * 2 < numProductPartitions || numPartsResult / 2 > numProductPartitions) {
+
+ debug(s"Will re-coalesce from ${numPartsResult} to ${numProductPartitions}")
+
+ val rowRdd = deblockify(blockifiedRdd).coalesce(numPartitions = numProductPartitions)
+
+ rowRdd
+
+ } else {
+
+ // We don't have a terribly different partition
+ blockifiedRdd
+ }
+
+ }
+
+ /**
+ * This function tries to use join instead of cartesian to group blocks together without bloating
+ * the number of partitions. Hope is that we can apply pairwise reduction of block pair right away
+ * so if the data to one of the join parts is streaming, the result is still fitting to memory,
+ * since result size is much smaller than the operands.
+ *
+ * @param blocksA blockified RDD for A
+ * @param blocksB blockified RDD for B
+ * @param blockFunc a function over (blockA, blockB). Implies `blockA %*% blockB.t` but perhaps may be
+ * switched to another scheme based on which of the sides, A or B, is bigger.
+ */
+ private def pairwiseApply[K1, K2, T](blocksA: BlockifiedDrmRdd[K1], blocksB: BlockifiedDrmRdd[K2], blockFunc:
+ (BlockifiedDrmTuple[K1], BlockifiedDrmTuple[K2]) => T): RDD[(Int, T)] = {
+
+ // We will be joining blocks in B to blocks in A using A-partition as a key.
+
+ // Prepare A side.
+ val blocksAKeyed = blocksA.mapPartitionsWithIndex { (part, blockIter) =>
+
+ val r = if (blockIter.hasNext) Some(part -> blockIter.next) else Option.empty[(Int, BlockifiedDrmTuple[K1])]
+
+ require(blockIter.hasNext == false, s"more than 1 (${blockIter.size + 1}) blocks per partition and A of AB'")
+
+ r.toIterator
+ }
+
+ // Prepare B-side.
+ val aParts = blocksA.partitions.size
+ val blocksBKeyed = blocksB.flatMap(bTuple => for (blockKey <- (0 until aParts).view) yield blockKey -> bTuple )
+
+ // Perform the inner join. Let's try to do a simple thing now.
+ blocksAKeyed.join(blocksBKeyed, numPartitions = aParts)
+
+ // Apply product function which should produce smaller products. Hopefully, this streams blockB's in
+ .map{case (partKey,(blockA, blockB)) => partKey -> blockFunc(blockA, blockB)}
+
+ }
+
+ private[blas] def abt_nograph_cart[K: ClassTag](
+ operator: OpABt[K],
+ srcA: DrmRddInput[K],
+ srcB: DrmRddInput[Int]): DrmRddInput[K] = {
+
+ // Blockify everything.
+ val blocksA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
// Mark row-blocks with group id
.mapPartitionsWithIndex((part, iter) => {
@@ -83,28 +232,35 @@ object ABt {
}
})
- val blocksB = srcB.toBlockifiedDrmRdd()
+ val blocksB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
// Final product's geometry. We want to extract that into local variables since we want to use
// them as closure attributes.
val prodNCol = operator.ncol
val prodNRow = operator.nrow
-
- // Approximate number of final partitions.
- val numProductPartitions =
- if (blocksA.partitions.size > blocksB.partitions.size) {
- ((prodNCol.toDouble / operator.A.ncol) * blocksA.partitions.size).ceil.toInt
- } else {
- ((prodNRow.toDouble / operator.B.ncol) * blocksB.partitions.size).ceil.toInt
- }
+ val aNCol = operator.A.ncol
+
+ // Approximate number of final partitions. We take bigger partitions as our guide to number of
+ // elements per partition. TODO: do it better.
- //srcA.partitions.size.max(that = srcB.partitions.size)
+ // Elements per partition, bigger of two operands.
+ val epp = aNCol.toDouble * prodNRow / blocksA.partitions.size max aNCol.toDouble * prodNCol /
+ blocksB.partitions.size
+ // Number of partitions we want to converge to in the product. For now we simply extrapolate that
+ // assuming product density and operand densities being about the same; and using the same element
+ // per partition number in the product as the bigger of two operands.
+ val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt
+
+ debug(
+ s"AB': #parts = $numProductPartitions; A #parts=${blocksA.partitions.size}, B #parts=${blocksB.partitions.size}.")
// The plan.
- var blockifiedRdd :BlockifiedDrmRdd[K] = blocksA
+ var blockifiedRdd: BlockifiedDrmRdd[K] = blocksA
- // Build Cartesian. It may require a bit more memory there at tasks.
+ // Build Cartesian. It generates a LOT of tasks. TODO: figure how to fix performance of AB'
+ // operator. The thing is that product after map is really small one (partition fraction x
+ // partition fraction) so they can be combined into much bigger chunks.
.cartesian(blocksB)
// Multiply blocks
@@ -126,10 +282,14 @@ object ABt {
.combineByKey[(Array[K],Matrix)](
createCombiner = (t: (Array[K], Array[Int], Matrix)) => t match {
+
+ // Create combiner structure out of two products. Our combiner is sparse row matrix
+ // initialized to final product partition block dimensions.
case (rowKeys, colKeys, blockProd) =>
- // Accumulator is a row-wise block of sparse vectors.
- val acc:Matrix = new SparseRowMatrix(rowKeys.size, prodNCol)
+ // Accumulator is a row-wise block of sparse vectors. Since we assign to columns,
+ // the most efficient is perhaps to create column-oriented block here.
+ val acc:Matrix = new SparseRowMatrix(prodNCol, rowKeys.size).t
// Update accumulator using colKeys as column index indirection
colKeys.view.zipWithIndex.foreach({
@@ -168,6 +328,8 @@ object ABt {
// having at most one block per partition.
blockifiedRdd = rbind(blockifiedRdd)
- new DrmRddInput(blockifiedSrc = Some(blockifiedRdd))
+ blockifiedRdd
}
+
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
index 3cdb797..8a90398 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AewB.scala
@@ -20,19 +20,22 @@ package org.apache.mahout.sparkbindings.blas
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import scala.reflect.ClassTag
import org.apache.spark.SparkContext._
-import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.math._
+import scalabindings._
import RLikeOps._
import org.apache.mahout.math.{SequentialAccessSparseVector, Matrix, Vector}
-import org.apache.mahout.math.drm.logical.{OpAewScalar, OpAewB}
-import org.apache.log4j.Logger
+import org.apache.mahout.math.drm.logical.{AbstractUnaryOp, TEwFunc, OpAewScalar, OpAewB}
import org.apache.mahout.sparkbindings.blas.AewB.{ReduceFuncScalar, ReduceFunc}
import org.apache.mahout.sparkbindings.{BlockifiedDrmRdd, DrmRdd, drm}
import org.apache.mahout.math.drm._
+import org.apache.mahout.logging._
+import collection._
+import JavaConversions._
/** Elementwise drm-drm operators */
object AewB {
- private val log = Logger.getLogger(AewB.getClass)
+ private final implicit val log = getLog(AewB.getClass)
/**
* Set to false to disallow in-place elementwise operations in case side-effects and non-idempotent
@@ -44,10 +47,10 @@ object AewB {
type ReduceFuncScalar = (Matrix, Double) => Matrix
- private[blas] def getEWOps() = {
- val inplaceProp = System.getProperty(PROPERTY_AEWB_INPLACE, "true").toBoolean
- if (inplaceProp) InplaceEWOps else CloningEWOps
- }
+ private[blas] def ewInplace(): Boolean = System.getProperty(PROPERTY_AEWB_INPLACE, "false").toBoolean
+
+ private[blas] def getEWOps() = if (ewInplace()) InplaceEWOps else CloningEWOps
+
/** Elementwise matrix-matrix operator, now handles both non- and identically partitioned */
def a_ew_b[K: ClassTag](op: OpAewB[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
@@ -67,12 +70,14 @@ object AewB {
val a = srcA.toDrmRdd()
val b = srcB.toDrmRdd()
+ debug(s"A${op.op}B: #partsA=${a.partitions.size},#partsB=${b.partitions.size}.")
+
// Check if A and B are identically partitioned AND keyed. if they are, then just perform zip
// instead of join, and apply the op map-side. Otherwise, perform join and apply the op
// reduce-side.
val rdd = if (op.isIdenticallyPartitioned(op.A)) {
- log.debug("applying zipped elementwise")
+ debug(s"A${op.op}B:applying zipped elementwise")
a
.zip(b)
@@ -83,7 +88,7 @@ object AewB {
}
} else {
- log.debug("applying elementwise as join")
+ debug("A${op.op}B:applying elementwise as join")
a
// Full outer-join operands row-wise
@@ -103,13 +108,51 @@ object AewB {
})
}
- new DrmRddInput(rowWiseSrc = Some(ncol -> rdd))
+ rdd
+ }
+
+ def a_ew_func[K:ClassTag](op:AbstractUnaryOp[K,K] with TEwFunc, srcA: DrmRddInput[K]):DrmRddInput[K] = {
+
+ val evalZeros = op.evalZeros
+ val inplace = ewInplace()
+ val f = op.f
+
+ // Before obtaining blockified rdd, see if we have to fix int row key consistency so that missing
+ // rows can get lazily pre-populated with empty vectors before proceeding with elementwise scalar.
+ val aBlockRdd = if (implicitly[ClassTag[K]] == ClassTag.Int && op.A.canHaveMissingRows && evalZeros) {
+ val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
+ drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
+ } else {
+ srcA.toBlockifiedDrmRdd(op.A.ncol)
+ }
+
+ val rdd = aBlockRdd.map {case (keys, block) =>
+
+ // Do inplace or allocate a new copy?
+ val newBlock = if (inplace) block else block cloned
+
+ // Operation cares about zeros?
+ if (evalZeros) {
+
+ // Yes, we evaluate all:
+ newBlock := ((_,_,x)=>f(x))
+ } else {
+
+ // No, evaluate non-zeros only row-wise
+ for (row <- newBlock; el <- row.nonZeroes) el := f(el.get)
+ }
+
+ keys -> newBlock
+ }
+
+ rdd
}
/** Physical algorithm to handle matrix-scalar operators like A - s or s -: A */
def a_ew_scalar[K: ClassTag](op: OpAewScalar[K], srcA: DrmRddInput[K], scalar: Double):
DrmRddInput[K] = {
+
val ewOps = getEWOps()
val opId = op.op
@@ -129,15 +172,17 @@ object AewB {
val fixedRdd = fixIntConsistency(op.A.asInstanceOf[DrmLike[Int]], src = srcA.toDrmRdd().asInstanceOf[DrmRdd[Int]])
drm.blockify(fixedRdd, blockncol = op.A.ncol).asInstanceOf[BlockifiedDrmRdd[K]]
} else {
- srcA.toBlockifiedDrmRdd()
+ srcA.toBlockifiedDrmRdd(op.A.ncol)
}
+ debug(s"A${op.op}$scalar: #parts=${aBlockRdd.partitions.size}.")
+
val rdd = aBlockRdd
- .map({
+ .map {
case (keys, block) => keys -> reduceFunc(block, scalar)
- })
+ }
- new DrmRddInput[K](blockifiedSrc = Some(rdd))
+ rdd
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
index c923e62..5f9f84a 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AinCoreB.scala
@@ -6,13 +6,17 @@ import scalabindings._
import RLikeOps._
import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm._
+import org.apache.mahout.logging._
import scala.reflect.ClassTag
import org.apache.mahout.math.DiagonalMatrix
import org.apache.mahout.math.drm.logical.OpTimesRightMatrix
+
/** Matrix product with one of operands an in-core matrix */
object AinCoreB {
+ private final implicit val log = getLog(AinCoreB.getClass)
+
def rightMultiply[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
if ( op.right.isInstanceOf[DiagonalMatrix])
rightMultiply_diag(op, srcA)
@@ -21,23 +25,27 @@ object AinCoreB {
}
private def rightMultiply_diag[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
- val rddA = srcA.toBlockifiedDrmRdd()
+ val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
implicit val ctx:DistributedContext = rddA.context
val dg = drmBroadcast(op.right.viewDiagonal())
+ debug(s"operator A %*% inCoreB-diagonal. #parts=${rddA.partitions.size}.")
+
val rdd = rddA
// Just multiply the blocks
.map {
case (keys, blockA) => keys -> (blockA %*%: diagv(dg))
}
- new DrmRddInput(blockifiedSrc = Some(rdd))
+ rdd
}
private def rightMultiply_common[K: ClassTag](op: OpTimesRightMatrix[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
- val rddA = srcA.toBlockifiedDrmRdd()
+ val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
implicit val sc:DistributedContext = rddA.sparkContext
+ debug(s"operator A %*% inCoreB. #parts=${rddA.partitions.size}.")
+
val bcastB = drmBroadcast(m = op.right)
val rdd = rddA
@@ -46,7 +54,7 @@ object AinCoreB {
case (keys, blockA) => keys -> (blockA %*% bcastB)
}
- new DrmRddInput(blockifiedSrc = Some(rdd))
+ rdd
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
index 56de9f4..5789bd2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/At.scala
@@ -17,16 +17,20 @@
package org.apache.mahout.sparkbindings.blas
-import org.apache.mahout.sparkbindings.drm.DrmRddInput
+import org.apache.mahout.sparkbindings.drm._
import org.apache.mahout.math.scalabindings._
+import org.apache.mahout.logging._
import RLikeOps._
import org.apache.spark.SparkContext._
import org.apache.mahout.math.{DenseVector, Vector, SequentialAccessSparseVector}
import org.apache.mahout.math.drm.logical.OpAt
+
/** A' algorithms */
object At {
+ private final implicit val log = getLog(At.getClass)
+
def at(
operator: OpAt,
srcA: DrmRddInput[Int]): DrmRddInput[Int] = at_nograph(operator = operator, srcA = srcA)
@@ -39,10 +43,15 @@ object At {
* groups into final rows of the transposed matrix.
*/
private[blas] def at_nograph(operator: OpAt, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
- val drmRdd = srcA.toBlockifiedDrmRdd()
+
+ debug("operator A'.")
+
+ val drmRdd = srcA.toBlockifiedDrmRdd(operator.A.ncol)
val numPartitions = drmRdd.partitions.size
val ncol = operator.ncol
+ debug(s"A' #parts = $numPartitions.")
+
// Validity of this conversion must be checked at logical operator level.
val nrow = operator.nrow.toInt
val atRdd = drmRdd
@@ -70,7 +79,7 @@ object At {
key -> v
}).densify()
- new DrmRddInput(rowWiseSrc = Some(ncol -> atRdd))
+ atRdd
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
index be4f08c..a212878 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtA.scala
@@ -17,6 +17,7 @@
package org.apache.mahout.sparkbindings.blas
+import org.apache.mahout.logging._
import org.apache.mahout.math._
import org.apache.mahout.sparkbindings._
import org.apache.mahout.sparkbindings.drm._
@@ -34,25 +35,30 @@ import SparkEngine._
*/
object AtA {
- final val log = Logger.getLogger(AtA.getClass)
+ private final implicit val log = getLog(AtA.getClass)
final val PROPERTY_ATA_MAXINMEMNCOL = "mahout.math.AtA.maxInMemNCol"
+ final val PROPERTY_ATA_MMUL_BLOCKHEIGHT = "mahout.math.AtA.blockHeight"
/** Materialize A'A operator */
def at_a(operator: OpAtA[_], srcRdd: DrmRddInput[_]): DrmRddInput[Int] = {
- val maxInMemNCol = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, "2000").toInt
+ val maxInMemNCol = System.getProperty(PROPERTY_ATA_MAXINMEMNCOL, "200").toInt
maxInMemNCol.ensuring(_ > 0, "Invalid A'A in-memory setting for optimizer")
if (operator.ncol <= maxInMemNCol) {
+
// If we can comfortably fit upper-triangular operator into a map memory, we will run slim
// algorithm with upper-triangular accumulators in maps.
- val inCoreA = at_a_slim(srcRdd = srcRdd, operator = operator)
+ val inCoreA = at_a_slim(srcRdd = srcRdd.toDrmRdd(), operator = operator)
val drmRdd = parallelizeInCore(inCoreA, numPartitions = 1)(sc = srcRdd.sparkContext)
- new DrmRddInput(rowWiseSrc = Some(inCoreA.ncol, drmRdd))
+ drmRdd
+
} else {
+
// Otherwise, we need to run a distributed, big version
- new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator)))
+ // new DrmRddInput(rowWiseSrc = Some(operator.ncol, at_a_nongraph(srcRdd = srcRdd, op = operator)))
+ at_a_nongraph_mmul(srcRdd = srcRdd.toBlockifiedDrmRdd(operator.A.ncol), op = operator)
}
}
@@ -64,7 +70,7 @@ object AtA {
*/
def at_a_slim(operator: OpAtA[_], srcRdd: DrmRdd[_]): Matrix = {
- log.debug("Applying slim A'A.")
+ debug("operator slim A'A(Spark)")
val ncol = operator.ncol
// Compute backing vector of tiny-upper-triangular accumulator accross all the data.
@@ -73,122 +79,195 @@ object AtA {
val ut = new UpperTriangular(ncol)
// Strategy is to add to an outer product of each row to the upper triangular accumulator.
- pIter.foreach({
- case (k, v) =>
+ pIter.foreach({ case (k, v) =>
- // Use slightly various traversal strategies over dense vs. sparse source.
- if (v.isDense) {
+ // Use slightly various traversal strategies over dense vs. sparse source.
+ if (v.isDense) {
- // Update upper-triangular pattern only (due to symmetry).
- // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is
- // such spectacular case they were deesigned for.. Yes I do observe some 20% difference
- // compared to while loops with no other payload, but the other payload is usually much
- // heavier than this overhead, so... I am keeping this as is for the time being.
+ // Update upper-triangular pattern only (due to symmetry).
+ // Note: Scala for-comprehensions are said to be fairly inefficient this way, but this is
+ // such spectacular case they were deesigned for.. Yes I do observe some 20% difference
+ // compared to while loops with no other payload, but the other payload is usually much
+ // heavier than this overhead, so... I am keeping this as is for the time being.
- for (row <- 0 until v.length; col <- row until v.length)
- ut(row, col) = ut(row, col) + v(row) * v(col)
+ for (row <- 0 until v.length; col <- row until v.length)
+ ut(row, col) = ut(row, col) + v(row) * v(col)
- } else {
+ } else {
- // Sparse source.
- v.nonZeroes().view
+ // Sparse source.
+ v.nonZeroes().view
- // Outer iterator iterates over rows of outer product.
- .foreach(elrow => {
+ // Outer iterator iterates over rows of outer product.
+ .foreach(elrow => {
- // Inner loop for columns of outer product.
- v.nonZeroes().view
+ // Inner loop for columns of outer product.
+ v.nonZeroes().view
- // Filter out non-upper nonzero elements from the double loop.
- .filter(_.index >= elrow.index)
+ // Filter out non-upper nonzero elements from the double loop.
+ .filter(_.index >= elrow.index)
- // Incrementally update outer product value in the uppper triangular accumulator.
- .foreach(elcol => {
+ // Incrementally update outer product value in the uppper triangular accumulator.
+ .foreach(elcol => {
- val row = elrow.index
- val col = elcol.index
- ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
+ val row = elrow.index
+ val col = elcol.index
+ ut(row, col) = ut(row, col) + elrow.get() * elcol.get()
- })
})
+ })
- }
+ }
})
Iterator(dvec(ddata = ut.getData): Vector)
- })
-
- .collect()
- .reduce(_ += _)
+ }).collect().reduce(_ += _)
new DenseSymmetricMatrix(resSym)
}
+ // Version that tries to use groupBy. In practice this is the slowest.
+ def at_a_group(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRddInput[Int] = {
+ debug("operator non-slim A'A(Spark-group).")
+
+ // Determine how many partitions the new matrix would need approximately. We base that on
+ // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more
+ // dense in reality than the source.
+ val m = op.A.nrow
+ val n = op.A.ncol
+ val srcNumParts = srcRdd.partitions.size
+ val finalNumParts = (srcNumParts * n / m).ceil.toInt max 1
+ val numParts = finalNumParts max srcNumParts
+ val ranges = computeEvenSplits(n, numParts)
+
+ var rddAtA = srcRdd
+
+ // Remove key, key is irrelevant
+ .map(_._2)
+
+ // Form partial outer blocks for each partition
+ .flatMap { v =>
+ for (blockKey <- 0 until numParts) yield {
+ blockKey -> v
+ }
+ }
+ // Sent to individual partition reducer
+ .groupByKey(numPartitions = numParts)
+
+ // Reduce individual group
+ .map { case (blockKey, iter) =>
+ val range = ranges(blockKey)
+ val mxC: Matrix = new SparseRowMatrix(range.size, n, false)
+ iter.foreach(vec => addOuterProduct(mxC, vec(range), vec))
+
+ // Fix keys
+ val blockStart = range.start
+ val rowKeys = Array.tabulate(mxC.nrow)(blockStart + _)
+ rowKeys -> mxC
+ }
+
+ if (log.isDebugEnabled)
+ log.debug(s"AtA (grouping) #parts: ${rddAtA.partitions.size}.")
+
+ if (finalNumParts < numParts) rddAtA = rddAtA.coalesce(finalNumParts, shuffle = false)
+
+ rddAtA
+ }
+
+
/** The version of A'A that does not use GraphX */
- def at_a_nongraph(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRdd[Int] = {
+ def at_a_nongraph(op: OpAtA[_], srcRdd: DrmRdd[_]): DrmRddInput[Int] = {
- log.debug("Applying non-slim non-graph A'A.")
+ debug("Applying non-slim non-graph A'A.")
// Determine how many partitions the new matrix would need approximately. We base that on
// geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more
// dense in reality than the source.
-
val m = op.A.nrow
val n = op.A.ncol
-/* possible fix for index out of range for vector range
- val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1
+ val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.toInt max 1
val blockHeight = (n - 1) / numParts + 1
-*/
- val numParts = (srcRdd.partitions.size.toDouble * n / m).ceil.round.toInt max 1 min n
+ val offsets = (0 until numParts).map(_ * blockHeight)
+ val ranges = offsets.map(offset => offset until (offset + blockHeight min n))
- // Computing evenly split ranges to denote each partition size.
+ val rddAtA = srcRdd
- // Base size.
- val baseSize = n / numParts
+ // Remove key, key is irrelevant
+ .map(_._2)
- // How many partitions needs to be baseSize +1.
- val slack = n - baseSize * numParts
+ // Form partial outer blocks for each partition
+ .flatMap { v =>
+ for (blockKey <- 0 until numParts) yield {
+ blockKey ->(blockKey, v)
+ }
+ }
+ // Combine outer products
+ .combineByKey(// Combiner factory
+ createCombiner = (t: (Int, Vector)) => {
+ val partNo = t._1
+ val vec = t._2
+ val range = ranges(partNo)
+ val mxC = if (vec.isDense) new DenseMatrix(range.size, n) else new SparseRowMatrix(range.size, n)
+ addOuterProduct(mxC, vec(range), vec)
+ },
+
+ // Merge values into existing partition accumulator.
+ mergeValue = (mxC: Matrix, t: (Int, Vector)) => {
+ val partNo = t._1
+ val vec = t._2
+ addOuterProduct(mxC, vec(ranges(partNo)), vec)
+ },
+
+ // Merge combiners
+ mergeCombiners = (mxC1: Matrix, mxC2: Matrix) => mxC1 += mxC2, numPartitions = numParts)
+
+ // Restore proper block keys
+ .map { case (blockKey, block) =>
+ val blockStart = blockKey * blockHeight
+ val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
+ rowKeys -> block
+ }
- val ranges =
- // Start with partition offsets... total numParts + 1.
- (0 to numParts).view.map { i => (baseSize + 1) * i - (i - slack max 0)}
- // And convert offsets to ranges.
- .sliding(2).map(s => s(0) until s(1)).toIndexedSeq
+ if (log.isDebugEnabled)
+ log.debug(s"AtA #parts: ${rddAtA.partitions.size}.")
- val rddAtA = srcRdd
+ rddAtA
+ }
- // Remove key, key is irrelevant
- .map(_._2)
-
- // Form partial outer blocks for each partition
- .flatMap {
- v =>
- for (blockKey <- Stream.range(0, numParts)) yield {
-/* patch to fix index out of range for vector access
- val blockStart = blockKey * blockHeight
- val blockEnd = n min (blockStart + blockHeight)
- blockKey -> (v(blockStart until blockEnd) cross v)
-*/
- val range = ranges(blockKey)
- blockKey -> (v(range) cross v)
- }
+ /**
+ * The version of A'A that does not use GraphX. Tries to use blockwise matrix multiply. If an
+ * accelerated matrix back is available, this might be somewhat faster.
+ */
+ def at_a_nongraph_mmul(op: OpAtA[_], srcRdd: BlockifiedDrmRdd[_]): DrmRddInput[Int] = {
+
+ // Determine how many partitions the new matrix would need approximately. We base that on
+ // geometry only, but it may eventually not be that adequate. Indeed, A'A tends to be much more
+ // dense in reality than the source.
+ val m = op.A.nrow
+ val n = op.A.ncol
+ val aparts = srcRdd.partitions.size
+ val numParts = estimateProductPartitions(anrow = n, ancol = m, bncol = n, aparts = aparts, bparts = aparts)
+ val ranges = computeEvenSplits(n, numParts)
+
+ debug(s"operator mmul-A'A(Spark); #parts = $numParts, #partsA=$aparts.")
+
+ val rddAtA = srcRdd.flatMap { case (keys, block) =>
+ Iterator.tabulate(numParts) { i =>
+ i -> block(::, ranges(i)).t %*% block
+ }
}
- // Combine outer blocks
- .reduceByKey(_ += _)
-
- // Restore proper block keys
- .map {
- case (blockKey, block) =>
-/* patch to fix index out of range for vector access
- val blockStart = blockKey * blockHeight
- val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
-*/
- val range = ranges(blockKey)
- val rowKeys = Array.tabulate(block.nrow)(range.start + _)
- rowKeys -> block
+ // Reduce partial blocks.
+ .reduceByKey(_ += _, numPartitions = numParts)
+
+ // Produce keys
+ .map { case (blockKey, block) =>
+
+ val blockStart = ranges(blockKey).start
+ val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
+ rowKeys -> block
}
- new DrmRddInput[Int](blockifiedSrc = Some(rddAtA))
+ rddAtA
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
index 86aadc8..45705a9 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/AtB.scala
@@ -17,8 +17,13 @@
package org.apache.mahout.sparkbindings.blas
-import scala.reflect.ClassTag
-import org.apache.mahout.math.drm._
+import reflect.ClassTag
+import collection._
+import JavaConversions._
+
+import org.apache.mahout.logging._
+import org.apache.mahout.math._
+import drm._
import org.apache.mahout.sparkbindings.drm._
import org.apache.spark.rdd.RDD
import org.apache.mahout.math.scalabindings._
@@ -27,92 +32,330 @@ import org.apache.spark.SparkContext._
import org.apache.log4j.Logger
import org.apache.mahout.math.drm.logical.OpAtB
+import scala.collection.mutable.ArrayBuffer
+
object AtB {
- private val log = Logger.getLogger(AtB.getClass)
+ private final implicit val log = getLog(AtB.getClass)
+ def atb[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A]): DrmRddInput[Int] = {
+ atb_nograph_mmul(operator, srcA, srcB, operator.A.partitioningTag == operator.B.partitioningTag)
+ }
/**
* The logic for computing A'B is pretty much map-side generation of partial outer product blocks
* over co-grouped rows of A and B. If A and B are identically partitioned, we can just directly
* zip all the rows. Otherwise, we need to inner-join them first.
+ *
*/
- def atb_nograph[A: ClassTag](
- operator: OpAtB[A],
- srcA: DrmRddInput[A],
- srcB: DrmRddInput[A],
- zippable:Boolean = false
- ): DrmRddInput[Int] = {
+ @deprecated("slow, will remove", since = "0.10.2")
+ def atb_nograph[A: ClassTag](operator: OpAtB[A], srcA: DrmRddInput[A], srcB: DrmRddInput[A],
+ zippable: Boolean = false): DrmRddInput[Int] = {
val rddA = srcA.toDrmRdd()
- val zipped = if ( zippable ) {
+ val rddB = srcB.toDrmRdd()
+
+
+ val prodNCol = operator.ncol
+ val prodNRow = operator.nrow
+ val aNRow = operator.A.nrow
+
+ // Approximate number of final partitions. We take bigger partitions as our guide to number of
+ // elements per partition. TODO: do it better.
+ // Elements per partition, bigger of two operands.
+ val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol /
+ rddB.partitions.size
+
+ // Number of partitions we want to converge to in the product. For now we simply extrapolate that
+ // assuming product density and operand densities being about the same; and using the same element
+ // per partition number in the product as the bigger of two operands.
+ val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt
+
+ if (log.isDebugEnabled) log.debug(s"AtB: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.")
+
+ val zipped = if (zippable) {
log.debug("A and B for A'B are identically distributed, performing row-wise zip.")
- rddA.zip(other = srcB.toDrmRdd())
+ rddA.zip(other = rddB)
} else {
log.debug("A and B for A'B are not identically partitioned, performing inner join.")
- rddA.join(other=srcB.toDrmRdd()).map({
- case (key,(v1,v2) ) => (key -> v1) -> (key -> v2)
+ rddA.join(other = rddB, numPartitions = numProductPartitions).map({ case (key, (v1,
+ v2)) => (key -> v1) -> (key -> v2)
})
}
- val blockHeight = safeToNonNegInt(
- (operator.B.ncol.toDouble/rddA.partitions.size).ceil.round max 1L
- )
-
- computeAtBZipped(
- zipped,
- nrow = operator.nrow,
- ancol = operator.A.ncol,
- bncol = operator.B.ncol,
- blockHeight = blockHeight
- )
+ computeAtBZipped2(zipped, nrow = operator.nrow, ancol = operator.A.ncol, bncol = operator.B.ncol,
+ numPartitions = numProductPartitions)
+ }
+
+ private[sparkbindings] def atb_nograph_mmul[A:ClassTag](operator:OpAtB[A], srcA: DrmRddInput[A], srcB:DrmRddInput[A], zippable:Boolean = false):DrmRddInput[Int] = {
+
+ debug("operator mmul-A'B(Spark)")
+
+ val prodNCol = operator.ncol
+ val prodNRow = safeToNonNegInt(operator.nrow)
+ val aNRow = safeToNonNegInt(operator.A.nrow)
+
+ val rddA = srcA.toDrmRdd()
+ val rddB = srcB.toDrmRdd()
+
+ // Approximate number of final partitions. We take bigger partitions as our guide to number of
+ // elements per partition. TODO: do it better.
+ // Elements per partition, bigger of two operands.
+ val epp = aNRow.toDouble * prodNRow / rddA.partitions.size max aNRow.toDouble * prodNCol /
+ rddB.partitions.size
+
+ // Number of partitions we want to converge to in the product. For now we simply extrapolate that
+ // assuming product density and operand densities being about the same; and using the same element
+ // per partition number in the product as the bigger of two operands.
+ val numProductPartitions = (prodNCol.toDouble * prodNRow / epp).ceil.toInt min prodNRow
+
+ if (log.isDebugEnabled) log.debug(s"AtB mmul: #parts ${numProductPartitions} for $prodNRow x $prodNCol geometry.")
+
+ val zipped = if (zippable) {
+
+ debug("mmul-A'B - zip: are identically distributed, performing row-wise zip.")
+
+ val blockdRddA = srcA.toBlockifiedDrmRdd(operator.A.ncol)
+ val blockdRddB = srcB.toBlockifiedDrmRdd(operator.B.ncol)
+
+ blockdRddA
+
+ // Zip
+ .zip(other = blockdRddB)
+
+ // Throw away the keys
+ .map { case ((_, blockA), (_, blockB)) => blockA -> blockB}
+
+ } else {
+
+ debug("mmul-A'B: cogroup for non-identically distributed stuff.")
+
+ // To take same route, we'll join stuff row-wise, blockify it here and then proceed with the
+ // same computation path. Although it is possible we could shave off one shuffle here. TBD.
+
+ rddA
+
+ // Do full join. We can't get away with partial join because it is going to lose some rows
+ // in case we have missing rows on either side.
+ .cogroup(other = rddB, numPartitions = rddA.partitions.size max rddB.partitions.size )
+
+
+ // Merge groups.
+ .mapPartitions{iter =>
+
+ val aRows = new ArrayBuffer[Vector](1000)
+ val bRows = new ArrayBuffer[Vector](1000)
+
+ // Populate hanging row buffs
+ iter.foreach{case (_, (arowbag,browbag)) =>
+
+ // Some up all vectors, if any, for a row. If we have > 1 that means original matrix had
+ // non-uniquely keyed rows which is generally a matrix format inconsistency (should not
+ // happen).
+ aRows += (if (arowbag.isEmpty)
+ new SequentialAccessSparseVector(prodNRow)
+ else arowbag.reduce(_ += _))
+
+ bRows += (if (browbag.isEmpty)
+ new SequentialAccessSparseVector(prodNCol)
+ else browbag.reduce(_ += _))
+ }
+
+ // Transform collection of vectors into blocks.
+ val blockNRow = aRows.size
+ assert(blockNRow == bRows.size)
+
+ val aBlock:Matrix = new SparseRowMatrix(blockNRow, prodNRow, aRows.toArray)
+ val bBlock:Matrix = new SparseRowMatrix(blockNRow, prodNCol, bRows.toArray)
+
+ // Form pairwise result
+ Iterator(aBlock -> bBlock)
+ }
+ }
+
+ computeAtBZipped3(pairwiseRdd = zipped, nrow = prodNRow, ancol = prodNRow, bncol = aNRow,
+ numPartitions = numProductPartitions)
+
+ }
+ /**
+ * Compute, combine and accumulate outer products for every key. The incoming tuple structure
+ * is (partNo, (vecA, vecB)), so for every `partNo` we compute an outer product of the form {{{
+ * vecA cross vecB
+ * }}}
+ * @param pairwiseRdd
+ * @return
+ */
+ @deprecated("slow, will remove", since = "0.10.2")
+ private[sparkbindings] def combineOuterProducts(pairwiseRdd: RDD[(Int, (Vector, Vector))], numPartitions: Int) = {
+
+ pairwiseRdd
+
+ // Reduce individual partitions
+ .combineByKey(createCombiner = (t: (Vector, Vector)) => {
+
+ val vecA = t._1
+ val vecB = t._2
+
+ // Create partition accumulator. Generally, summation of outer products probably calls for
+ // dense accumulators. However, let's assume extremely sparse cases are still possible, and
+ // by default assume any sparse case is an extremely sparse case. May need to tweak further.
+ val mxC: Matrix = if (!vecA.isDense && !vecB.isDense)
+ new SparseRowMatrix(vecA.length, vecB.length)
+ else
+ new DenseMatrix(vecA.length, vecB.length)
+
+ // Add outer product of arow and bRowFrag to mxC
+ addOuterProduct(mxC, vecA, vecB)
+
+ }, mergeValue = (mxC: Matrix, t: (Vector, Vector)) => {
+ // Merge of a combiner with another outer product fragment.
+ val vecA = t._1
+ val vecB = t._2
+
+ addOuterProduct(mxC, vecA, vecB)
+
+ }, mergeCombiners = (mxC1: Matrix, mxC2: Matrix) => {
+
+ // Merge of 2 combiners.
+ mxC1 += mxC2
+
+ }, numPartitions = numPartitions)
+ }
+
+ private[sparkbindings] def computeAtBZipped3[A: ClassTag](pairwiseRdd: RDD[(Matrix, Matrix)], nrow: Int,
+ ancol: Int, bncol: Int, numPartitions: Int) = {
+
+ val ranges = computeEvenSplits(nrow, numPartitions)
+
+ val rdd = pairwiseRdd.flatMap{ case (blockA, blockB) ⇒
+
+ // Handling microscopic Pat's cases. Any slicing doesn't work well on 0-row matrix. This
+ // probably should be fixed in the in-core matrix implementations.
+ if (blockA.nrow == 0 )
+ Iterator.empty
+ else
+ // Output each partial outer product with its correspondent partition #.
+ Iterator.tabulate(numPartitions) {part ⇒
+
+ val mBlock = blockA(::, ranges(part)).t %*% blockB
+ part → mBlock
+ }
+ }
+
+ // Reduce.
+ .reduceByKey(_ += _, numPartitions = numPartitions)
+
+ // Produce keys
+ .map { case (blockKey, block) ⇒ ranges(blockKey).toArray → block }
+
+ debug(s"A'B mmul #parts: ${rdd.partitions.size}.")
+
+ rdd
}
+ private[sparkbindings] def computeAtBZipped2[A: ClassTag](zipped: RDD[(DrmTuple[A], DrmTuple[A])], nrow: Long,
+ ancol: Int, bncol: Int, numPartitions: Int) = {
+
+ // The plan of this approach is to send a_i and parts of b_i to partitoin reducers which actually
+ // do outer product sum update locally (instead of sending outer blocks). Thus it should minimize
+ // expense for IO and also in-place partition block accum update should be much more efficient
+ // than forming outer block matrices and perform matrix-on-patrix +.
+ // Figure out appriximately block height per partition of the result.
+ val blockHeight = safeToNonNegInt((nrow - 1) / numPartitions) + 1
-// private[sparkbindings] def atb_nograph()
+ val partitionedRdd = zipped
+
+ // Split B-rows into partitions using blockHeight
+ .mapPartitions { iter =>
+
+ val offsets = (0 until numPartitions).map(_ * blockHeight)
+ val ranges = offsets.map(offs => offs until (offs + blockHeight min ancol))
+
+ // Transform into series of (part -> (arow, part-brow)) tuples (keyed by part #).
+ iter.flatMap { case ((_, arow), (_, brow)) =>
+
+ ranges.view.zipWithIndex.map { case (arange, partNum) =>
+ partNum -> (arow(arange).cloned -> brow)
+ }
+ }
+ }
+
+ val blockRdd = combineOuterProducts(partitionedRdd, numPartitions)
+
+ // Add ordinal row keys.
+ .map { case (blockNum, block) =>
+
+ // Starting key
+ var offset = blockNum * blockHeight
+
+ var keys = Array.tabulate(block.nrow)(offset + _)
+ keys -> block
+
+ }
+
+ blockRdd
+ }
/** Given already zipped, joined rdd of rows of A' and B, compute their product A'B */
- private[sparkbindings] def computeAtBZipped[A: ClassTag](zipped:RDD[(DrmTuple[A], DrmTuple[A])],
- nrow:Long, ancol:Int, bncol:Int, blockHeight: Int) = {
+ @deprecated("slow, will remove", since = "0.10.2")
+ private[sparkbindings] def computeAtBZipped[A: ClassTag](zipped: RDD[(DrmTuple[A], DrmTuple[A])], nrow: Long,
+ ancol: Int, bncol: Int, numPartitions: Int) = {
// Since Q and A are partitioned same way,we can just zip their rows and proceed from there by
// forming outer products. Our optimizer lacks this primitive, so we will implement it using RDDs
// directly. We try to compile B' = A'Q now by collecting outer products of rows of A and Q. At
// this point we need to split n-range of B' into sutiable number of partitions.
- val btNumParts = safeToNonNegInt((nrow - 1) / blockHeight + 1)
+ if (log.isDebugEnabled) {
+ log.debug(s"AtBZipped:zipped #parts ${zipped.partitions.size}")
+ log.debug(s"AtBZipped:Targeted #parts ${numPartitions}")
+ }
+
+ // Figure out appriximately block height per partition of the result.
+ val blockHeight = safeToNonNegInt((nrow - 1) / numPartitions) + 1
val rddBt = zipped
- // Produce outer product blocks
- .flatMap {
- case ((aKey, aRow), (qKey, qRow)) =>
- for (blockKey <- Stream.range(0, btNumParts)) yield {
- val blockStart = blockKey * blockHeight
- val blockEnd = ancol min (blockStart + blockHeight)
+ // Produce outer product blocks
+ .flatMap { case ((aKey, aRow), (qKey, qRow)) =>
+ for (blockKey <- Stream.range(0, numPartitions)) yield {
+ val blockStart = blockKey * blockHeight
+ val blockEnd = ancol min (blockStart + blockHeight)
- // Create block by cross product of proper slice of aRow and qRow
- blockKey -> (aRow(blockStart until blockEnd) cross qRow)
- }
- }
- // Combine blocks by just summing them up
- .reduceByKey {
- case (block1, block2) => block1 += block2
+ // Create block by cross product of proper slice of aRow and qRow
+ blockKey -> (aRow(blockStart until blockEnd) cross qRow)
+
+ // TODO: computing tons of cross product matrices seems to be pretty inefficient here. More
+ // likely single streaming algorithm of updates will perform much better here. So rewrite
+ // this using mapPartitions with numPartitions block accumulators.
+
+ }
}
+ // .combineByKey(
+ // createCombiner = (mx:Matrix) => mx,
+ // mergeValue = (c:Matrix,mx:Matrix) => c += mx,
+ // mergeCombiners = (c1:Matrix,c2:Matrix) => c1 += c2,
+ // numPartitions = numPartitions
+ // )
+ // Doesn't look like use of combineByKey produces any better results than reduceByKey. So keeping
+ // reduceByKey for simplicity. Combiners probably doesn't mean reduceByKey doesn't combine map-side.
+ // Combine blocks by just summing them up
+ .reduceByKey((block1, block2) => block1 += block2, numPartitions)
- // Throw away block key, generate row keys instead.
- .map {
- case (blockKey, block) =>
- val blockStart = blockKey * blockHeight
- val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
- rowKeys -> block
+ // Throw away block key, generate row keys instead.
+ .map { case (blockKey, block) =>
+ val blockStart = blockKey * blockHeight
+ val rowKeys = Array.tabulate(block.nrow)(blockStart + _)
+ rowKeys -> block
}
- new DrmRddInput[Int](blockifiedSrc = Some(rddBt))
+ if (log.isDebugEnabled) log.debug(s"AtBZipped #parts ${rddBt.partitions.size}")
+
+ rddBt
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
index 94c3f06..629accd 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Ax.scala
@@ -15,22 +15,22 @@ object Ax {
def ax_with_broadcast[K: ClassTag](op: OpAx[K], srcA: DrmRddInput[K]): DrmRddInput[K] = {
- val rddA = srcA.toBlockifiedDrmRdd()
- implicit val sc:DistributedContext = rddA.sparkContext
+ val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
+ implicit val sc: DistributedContext = rddA.sparkContext
val bcastX = drmBroadcast(op.x)
- val rdd = rddA
- // Just multiply the blocks
- .map({
- case (keys, blockA) => keys -> (blockA %*% bcastX).toColMatrix
- })
+ val rdd: BlockifiedDrmRdd[K] = rddA
+
+ // Just multiply the blocks
+ .map { case (keys, blockA) ⇒ keys → (blockA %*% bcastX).toColMatrix }
- new DrmRddInput(blockifiedSrc = Some(rdd))
+ new DrmRddInput(Right(rdd))
}
def atx_with_broadcast(op: OpAtx, srcA: DrmRddInput[Int]): DrmRddInput[Int] = {
- val rddA = srcA.toBlockifiedDrmRdd()
+
+ val rddA = srcA.toBlockifiedDrmRdd(op.A.ncol)
implicit val dc:DistributedContext = rddA.sparkContext
val bcastX = drmBroadcast(op.x)
@@ -52,10 +52,10 @@ object Ax {
// It is ridiculous, but in this scheme we will have to re-parallelize it again in order to plug
// it back as drm blockified rdd
- val rdd = dc.parallelize(Seq(inCoreM), numSlices = 1)
- .map(block => Array.tabulate(block.nrow)(i => i) -> block)
+ val rdd:BlockifiedDrmRdd[Int] = dc.parallelize(Seq(inCoreM), numSlices = 1)
+ .map{block ⇒ Array.tabulate(block.nrow)(i ⇒ i) -> block}
- new DrmRddInput(blockifiedSrc = Some(rdd))
+ rdd
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
index ea10ccb..4a379ec 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/CbindAB.scala
@@ -18,12 +18,13 @@
package org.apache.mahout.sparkbindings.blas
import org.apache.log4j.Logger
-import scala.reflect.ClassTag
+import org.apache.mahout.sparkbindings.DrmRdd
+import reflect._
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import org.apache.mahout.math._
import scalabindings._
import RLikeOps._
-import org.apache.mahout.math.drm.logical.OpCbind
+import org.apache.mahout.math.drm.logical.{OpCbindScalar, OpCbind}
import org.apache.spark.SparkContext._
/** Physical cbind */
@@ -31,6 +32,34 @@ object CbindAB {
private val log = Logger.getLogger(CbindAB.getClass)
+ def cbindAScalar[K:ClassTag](op: OpCbindScalar[K], srcA:DrmRddInput[K]) : DrmRddInput[K] = {
+ val srcRdd = srcA.toDrmRdd()
+
+ val ncol = op.A.ncol
+ val x = op.x
+
+ val fixedRdd = if (classTag[K] == ClassTag.Int && x != 0.0)
+ fixIntConsistency(op.asInstanceOf[OpCbindScalar[Int]],
+ src = srcRdd.asInstanceOf[DrmRdd[Int]]).asInstanceOf[DrmRdd[K]]
+ else srcRdd
+
+ val left = op.leftBind
+
+ val resultRdd = fixedRdd.map { case (key, vec) =>
+ val newVec = vec.like(ncol + 1)
+ if (left) {
+ newVec(1 to ncol) := vec
+ newVec(0) = x
+ } else {
+ newVec(0 until ncol) := vec
+ newVec(ncol) = x
+ }
+ key -> newVec
+ }
+
+ resultRdd
+ }
+
def cbindAB_nograph[K: ClassTag](op: OpCbind[K], srcA: DrmRddInput[K], srcB: DrmRddInput[K]): DrmRddInput[K] = {
val a = srcA.toDrmRdd()
@@ -88,7 +117,7 @@ object CbindAB {
}
}
- new DrmRddInput(rowWiseSrc = Some(op.ncol -> rdd))
+ rdd
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
index a3caac7..4cd9a74 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/DrmRddOps.scala
@@ -25,12 +25,14 @@ import org.apache.mahout.sparkbindings.DrmRdd
class DrmRddOps[K: ClassTag](private[blas] val rdd: DrmRdd[K]) {
+ /** Turn RDD into dense row-wise vectors if density threshold is exceeded. */
def densify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({
case (key, v) =>
val vd = if (!v.isDense && v.getNumNonZeroElements > threshold * v.length) new DenseVector(v) else v
key -> vd
})
+ /** Turn rdd into sparse RDD if density threshold is underrun. */
def sparsify(threshold: Double = 0.80): DrmRdd[K] = rdd.map({
case (key, v) =>
val vs = if (v.isDense() && v.getNumNonZeroElements <= threshold * v.length)
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
index 4c68c9a..2933ddc 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/MapBlock.scala
@@ -17,6 +17,7 @@
package org.apache.mahout.sparkbindings.blas
+import org.apache.mahout.math.drm.logical.OpMapBlock
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import org.apache.mahout.math.drm.BlockMapFunc
import org.apache.mahout.math.scalabindings.RLikeOps._
@@ -24,12 +25,13 @@ import scala.reflect.ClassTag
object MapBlock {
- def exec[S, R:ClassTag](src: DrmRddInput[S], ncol:Int, bmf:BlockMapFunc[S,R]): DrmRddInput[R] = {
+ def exec[S, R:ClassTag](src: DrmRddInput[S], operator:OpMapBlock[S,R]): DrmRddInput[R] = {
- // We can't use attributes to avoid putting the whole this into closure.
-
- val rdd = src.toBlockifiedDrmRdd()
- .map(blockTuple => {
+ // We can't use attributes directly in the closure in order to avoid putting the whole object
+ // into closure.
+ val bmf = operator.bmf
+ val ncol = operator.ncol
+ val rdd = src.toBlockifiedDrmRdd(operator.A.ncol).map(blockTuple => {
val out = bmf(blockTuple)
assert(out._2.nrow == blockTuple._2.nrow, "block mapping must return same number of rows.")
@@ -37,7 +39,8 @@ object MapBlock {
out
})
- new DrmRddInput(blockifiedSrc = Some(rdd))
+
+ rdd
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
index e73376d..0434a72 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Par.scala
@@ -1,50 +1,58 @@
package org.apache.mahout.sparkbindings.blas
+import org.apache.mahout.sparkbindings.drm
+
import scala.reflect.ClassTag
import org.apache.mahout.sparkbindings.drm.DrmRddInput
import org.apache.mahout.math.drm.logical.OpPar
import org.apache.spark.rdd.RDD
+import scala.math._
+
+import org.apache.mahout.logging._
/** Physical adjustment of parallelism */
object Par {
+ private final implicit val log = getLog(Par.getClass)
+
def exec[K: ClassTag](op: OpPar[K], src: DrmRddInput[K]): DrmRddInput[K] = {
- def adjust[T](rdd: RDD[T]): RDD[T] =
- if (op.minSplits > 0) {
- if (rdd.partitions.size < op.minSplits)
- rdd.coalesce(op.minSplits, shuffle = true)
- else rdd.coalesce(rdd.partitions.size)
- } else if (op.exactSplits > 0) {
- if (op.exactSplits < rdd.partitions.size)
- rdd.coalesce(numPartitions = op.exactSplits, shuffle = false)
- else if (op.exactSplits > rdd.partitions.size)
- rdd.coalesce(numPartitions = op.exactSplits, shuffle = true)
- else
- rdd.coalesce(rdd.partitions.size)
- } else if (op.exactSplits == -1 && op.minSplits == -1) {
-
- // auto adjustment, try to scale up to either x1Size or x2Size.
- val clusterSize = rdd.context.getConf.get("spark.default.parallelism", "1").toInt
-
- val x1Size = (clusterSize * .95).ceil.toInt
- val x2Size = (clusterSize * 1.9).ceil.toInt
-
- if (rdd.partitions.size <= x1Size)
- rdd.coalesce(numPartitions = x1Size, shuffle = true)
- else if (rdd.partitions.size <= x2Size)
- rdd.coalesce(numPartitions = x2Size, shuffle = true)
- else
- rdd.coalesce(numPartitions = rdd.partitions.size)
- } else rdd.coalesce(rdd.partitions.size)
-
- if (src.isBlockified) {
- val rdd = src.toBlockifiedDrmRdd()
- new DrmRddInput[K](blockifiedSrc = Some(adjust(rdd)))
+ val srcBlockified = src.isBlockified
+
+ val srcRdd = if (srcBlockified) src.toBlockifiedDrmRdd(op.ncol) else src.toDrmRdd()
+ val srcNParts = srcRdd.partitions.size
+
+ // To what size?
+ val targetParts = if (op.minSplits > 0) srcNParts max op.minSplits
+ else if (op.exactSplits > 0) op.exactSplits
+ else /* auto adjustment */ {
+ val stdParallelism = srcRdd.context.getConf.get("spark.default.parallelism", "1").toInt
+ val x1 = 0.95 * stdParallelism
+ if (srcNParts <= ceil(x1)) ceil(x1).toInt else ceil(2 * x1).toInt
+ }
+
+ debug(s"par $srcNParts => $targetParts.")
+
+ if (targetParts > srcNParts) {
+
+ // Expanding. Always requires deblockified stuff. May require re-shuffling.
+ val rdd = src.toDrmRdd().repartition(numPartitions = targetParts)
+
+ rdd
+
+ } else if (targetParts < srcNParts) {
+ // Shrinking.
+
+ if (srcBlockified) {
+ drm.rbind(src.toBlockifiedDrmRdd(op.ncol).coalesce(numPartitions = targetParts))
+ } else {
+ src.toDrmRdd().coalesce(numPartitions = targetParts)
+ }
} else {
- val rdd = src.toDrmRdd()
- new DrmRddInput[K](rowWiseSrc = Some(op.ncol -> adjust(rdd)))
+ // no adjustment required.
+ src
}
+
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
index 5037d68..62abba6 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/RbindAB.scala
@@ -31,11 +31,11 @@ object RbindAB {
// If any of the inputs is blockified, use blockified inputs
if (srcA.isBlockified || srcB.isBlockified) {
- val a = srcA.toBlockifiedDrmRdd()
- val b = srcB.toBlockifiedDrmRdd()
+ val a = srcA.toBlockifiedDrmRdd(op.A.ncol)
+ val b = srcB.toBlockifiedDrmRdd(op.B.ncol)
// Union seems to be fine, it is indeed just do partition-level unionization, no shuffles
- new DrmRddInput(blockifiedSrc = Some(a ++ b))
+ a ++ b
} else {
@@ -43,7 +43,7 @@ object RbindAB {
val a = srcA.toDrmRdd()
val b = srcB.toDrmRdd()
- new DrmRddInput(rowWiseSrc = Some(op.ncol -> (a ++ b)))
+ a ++ b
}
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
index d0a50b5..0284ba2 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/Slicing.scala
@@ -22,6 +22,6 @@ object Slicing {
// TODO: we probably need to re-shuffle result or at least cut down the partitions of 0 size
- new DrmRddInput(rowWiseSrc = Some(ncol -> rdd))
+ rdd
}
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/8a6b805a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
index 9a50afa..6b8513f 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/blas/package.scala
@@ -17,13 +17,17 @@
package org.apache.mahout.sparkbindings
+import org.apache.mahout.sparkbindings
+import org.apache.spark.rdd.RDD
+
import scala.reflect.ClassTag
-import org.apache.mahout.sparkbindings.drm.{CheckpointedDrmSpark, DrmRddInput}
import org.apache.spark.SparkContext._
import org.apache.mahout.math._
import org.apache.mahout.math.drm._
import scalabindings._
import RLikeOps._
+import collection._
+import JavaConversions._
/**
* This validation contains distributed algorithms that distributed matrix expression optimizer picks
@@ -31,8 +35,81 @@ import RLikeOps._
*/
package object blas {
- implicit def drmRdd2ops[K:ClassTag](rdd:DrmRdd[K]):DrmRddOps[K] = new DrmRddOps[K](rdd)
+ implicit def drmRdd2ops[K: ClassTag](rdd: DrmRdd[K]): DrmRddOps[K] = new DrmRddOps[K](rdd)
+
+
+ /**
+ * Rekey matrix dataset keys to consequtive int keys.
+ * @param rdd incoming matrix row-wise dataset
+ *
+ * @param computeMap if true, also compute mapping between old and new keys
+ * @tparam K existing key parameter
+ * @return
+ */
+ private[mahout] def rekeySeqInts[K: ClassTag](rdd: DrmRdd[K], computeMap: Boolean = true): (DrmRdd[Int],
+ Option[RDD[(K, Int)]]) = {
+
+ // Spark context please.
+ val sctx = rdd.context
+ import sctx._
+
+ // First, compute partition sizes.
+ val partSizes = rdd.mapPartitionsWithIndex((part, iter) => Iterator(part -> iter.size))
+
+ // Collect in-core
+ .collect()
+
+ // Starting indices
+ var startInd = new Array[Int](rdd.partitions.size)
+
+ // Save counts
+ for (pc <- partSizes) startInd(pc._1) = pc._2
+
+ // compute cumulative sum
+ val siBcast = broadcast(startInd.scanLeft(0)(_ + _).init)
+
+ // Compute key -> int index map:
+ val keyMap = if (computeMap) {
+ Some(rdd
+
+ // Process individual partition with index, output `key -> index` tuple
+ .mapPartitionsWithIndex((part, iter) => {
+
+ // Start index for this partition
+ val si = siBcast.value(part)
+ iter.zipWithIndex.map { case ((key, _), index) => key -> (index + si)}
+ })) // Some
+
+ } else {
+
+ // Were not asked to compute key mapping
+ None
+ }
+
+ // Finally, do the transform
+ val intRdd = rdd
+
+ // Re-number each partition
+ .mapPartitionsWithIndex((part, iter) => {
+ // Start index for this partition
+ val si = siBcast.value(part)
+
+ // Iterate over data by producing sequential row index and retaining vector value.
+ iter.zipWithIndex.map { case ((_, vec), ind) => si + ind -> vec}
+ })
+
+ // Finally, return drm -> keymap result
+
+ intRdd -> keyMap
+
+ }
+
+
+ /**
+ * Fills in missing rows in an Int-indexed matrix by putting in empty row vectors for the missing
+ * keys.
+ */
private[mahout] def fixIntConsistency(op: DrmLike[Int], src: DrmRdd[Int]): DrmRdd[Int] = {
if (op.canHaveMissingRows) {
@@ -45,20 +122,20 @@ package object blas {
// Compute the fix.
sc
- // Bootstrap full key set
- .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1)
+ // Bootstrap full key set
+ .parallelize(0 until dueRows, numSlices = rdd.partitions.size max 1)
- // Enable PairedFunctions
- .map(_ -> Unit)
+ // Enable PairedFunctions
+ .map(_ -> Unit)
- // Cogroup with all rows
- .cogroup(other = rdd)
+ // Cogroup with all rows
+ .cogroup(other = rdd)
- // Filter out out-of-bounds
- .filter { case (key, _) => key >= 0 && key < dueRows}
+ // Filter out out-of-bounds
+ .filter { case (key, _) => key >= 0 && key < dueRows}
- // Coalesce and output RHS
- .map { case (key, (seqUnit, seqVec)) =>
+ // Coalesce and output RHS
+ .map { case (key, (seqUnit, seqVec)) =>
val acc = seqVec.headOption.getOrElse(new SequentialAccessSparseVector(dueCols))
val vec = if (seqVec.size > 0) (acc /: seqVec.tail)(_ + _) else acc
key -> vec
@@ -68,4 +145,77 @@ package object blas {
}
+ /** Method to do `mxC += a cross b` in-plcae a bit more efficiently than this expression does. */
+ def addOuterProduct(mxC: Matrix, a: Vector, b: Vector): Matrix = {
+
+ // Try to pay attention to density a bit here when computing and adding the outer product of
+ // arow and brow fragment.
+ if (b.isDense)
+ for (ela <- a.nonZeroes) mxC(ela.index, ::) := { (i, x) => x + ela * b(i)}
+ else
+ for (ela <- a.nonZeroes; elb <- b.nonZeroes()) mxC(ela.index, elb.index) += ela * elb
+
+ mxC
+ }
+
+ /**
+ * Compute ranges of more or less even splits of total `nrow` number
+ *
+ * @param nrow
+ * @param numSplits
+ * @return
+ */
+ @inline
+ private[blas] def computeEvenSplits(nrow: Long, numSplits: Int): IndexedSeq[Range] = {
+ require(numSplits <= nrow, "Requested amount of splits greater than number of data points.")
+ require(nrow >= 1)
+ require(numSplits >= 1)
+
+ // Base split -- what is our base split size?
+ val baseSplit = safeToNonNegInt(nrow / numSplits)
+
+ // Slack -- how many splits will have to be incremented by 1 though?
+ val slack = safeToNonNegInt(nrow % numSplits)
+
+ // Compute ranges. We need to set ranges so that numSplits - slack splits have size of baseSplit;
+ // and `slack` splits have size baseSplit + 1. Here is how we do it: First, we compute the range
+ // offsets:
+ val offsets = (0 to numSplits).map(i => i * (baseSplit + 1) - (0 max i - slack))
+ // And then we connect the ranges using gaps between offsets:
+ offsets.sliding(2).map(offs => offs(0) until offs(1)).toIndexedSeq
+ }
+
+ /**
+ * Estimate number of partitions for the product of A %*% B.
+ *
+ * We take average per-partition element count of product as higher of the same of A and B. (prefer
+ * larger partitions of operands).
+ *
+ * @param anrow A.nrow
+ * @param ancol A.ncol
+ * @param bncol B.ncol
+ * @param aparts partitions in A
+ * @param bparts partitions in B
+ * @return recommended partitions
+ */
+ private[blas] def estimateProductPartitions(anrow:Long, ancol:Long, bncol:Long, aparts:Int, bparts:Int):Int = {
+
+ // Compute per-partition element density in A
+ val eppA = anrow.toDouble * ancol/ aparts
+
+ // Compute per-partition element density in B
+ val eppB = ancol.toDouble * bncol / bparts
+
+ // Take the maximum element density into account. Is it a good enough?
+ val epp = eppA max eppB
+
+ // product partitions
+ val prodParts = anrow * bncol / epp
+
+ val nparts = math.round(prodParts).toInt max 1
+
+ // Constrain nparts to maximum of anrow to prevent guaranteed empty partitions.
+ if (nparts > anrow) anrow.toInt else nparts
+ }
+
}