You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by ap...@apache.org on 2014/10/09 22:18:25 UTC
git commit: MAHOUT-1615: SparkEngine drmFromHDFS returning the same
Key for all Key,
Vec Pairs for Text-Keyed SequenceFiles. This closes apache/mahout#52
Repository: mahout
Updated Branches:
refs/heads/master 3c88f5043 -> c34e8a84c
MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for Text-Keyed SequenceFiles. This closes apache/mahout#52
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/c34e8a84
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/c34e8a84
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/c34e8a84
Branch: refs/heads/master
Commit: c34e8a84c39b7bd2175cdbcb8583d487548a34b4
Parents: 3c88f50
Author: Andrew Palumbo <ap...@outlook.com>
Authored: Thu Oct 9 16:16:14 2014 -0400
Committer: Andrew Palumbo <ap...@outlook.com>
Committed: Thu Oct 9 16:16:14 2014 -0400
----------------------------------------------------------------------
CHANGELOG | 2 +
.../apache/mahout/h2obindings/H2OEngine.scala | 2 +-
.../h2obindings/drm/CheckpointedDrmH2O.scala | 4 +-
.../mahout/math/drm/CheckpointedDrm.scala | 10 ++-
.../mahout/math/drm/DistributedEngine.scala | 2 +-
.../org/apache/mahout/math/drm/DrmLike.scala | 2 +
.../org/apache/mahout/math/drm/package.scala | 2 +-
.../mahout/math/drm/DrmLikeSuiteBase.scala | 8 ++-
pom.xml | 2 +-
.../sparkbindings/shell/MahoutSparkILoop.scala | 3 +
.../org/apache/mahout/common/DrmMetadata.scala | 56 +++++++++++++++++
.../org/apache/mahout/common/HDFSUtil.scala | 26 ++++++++
.../apache/mahout/common/Hadoop1HDFSUtil.scala | 65 ++++++++++++++++++++
.../mahout/sparkbindings/SparkEngine.scala | 57 ++++++-----------
.../drm/CheckpointedDrmSpark.scala | 5 +-
15 files changed, 198 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/CHANGELOG
----------------------------------------------------------------------
diff --git a/CHANGELOG b/CHANGELOG
index e650428..bc1edcf 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -2,6 +2,8 @@ Mahout Change Log
Release 1.0 - unreleased
+ MAHOUT-1615: SparkEngine drmFromHDFS returning the same Key for all Key,Vec Pairs for Text-Keyed SequenceFiles (dlyubimov & apalumbo)
+
MAHOUT-1610: Update tests to pass in Java 8 (srowen)
MAHOUT-1608: Add option in WikipediaToSequenceFile to remove category labels from documents (apalumbo)
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
index 54d950b..06125fe 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/H2OEngine.scala
@@ -54,7 +54,7 @@ object H2OEngine extends DistributedEngine {
*
* @return DRM[Any] where Any is automatically translated to value type
*/
- def drmFromHDFS(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] =
+ def drmDfsRead(path: String, parMin: Int = 0)(implicit dc: DistributedContext): CheckpointedDrm[_] =
new CheckpointedDrmH2O(H2OHdfs.drmFromFile(path, parMin), dc)
/** This creates an empty DRM with specified number of partitions and cardinality. */
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
----------------------------------------------------------------------
diff --git a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
index 15af5e2..c06455a 100644
--- a/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
+++ b/h2o/src/main/scala/org/apache/mahout/h2obindings/drm/CheckpointedDrmH2O.scala
@@ -29,6 +29,8 @@ class CheckpointedDrmH2O[K: ClassTag](
*/
def collect: Matrix = H2OHelper.matrixFromDrm(h2odrm)
+ /** Explicit extraction of key class Tag */
+ def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
/* XXX: call frame.remove */
def uncache(): this.type = this
@@ -36,7 +38,7 @@ class CheckpointedDrmH2O[K: ClassTag](
/**
* Persist DRM to on-disk over HDFS in Mahout DRM format.
*/
- def writeDRM(path: String): Unit = H2OHdfs.drmToFile(path, h2odrm)
+ def dfsWrite(path: String): Unit = H2OHdfs.drmToFile(path, h2odrm)
/**
* Action operator - Eagerly evaluate the lazily built operator graph to create
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
index 28fb7fd..082e5b9 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/CheckpointedDrm.scala
@@ -18,6 +18,7 @@
package org.apache.mahout.math.drm
import org.apache.mahout.math.Matrix
+import scala.reflect.ClassTag
/**
* Checkpointed DRM API. This is a matrix that has optimized RDD lineage behind it and can be
@@ -28,9 +29,16 @@ trait CheckpointedDrm[K] extends DrmLike[K] {
def collect: Matrix
- def writeDRM(path: String)
+ def dfsWrite(path: String)
/** If this checkpoint is already declared cached, uncache. */
def uncache(): this.type
+ /**
+ * Explicit extraction of key class Tag since traits don't support context bound access; but actual
+ * implementation knows it
+ */
+ def keyClassTag: ClassTag[K]
+
+
}
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
index d89cc53..eaf5aeb 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DistributedEngine.scala
@@ -68,7 +68,7 @@ trait DistributedEngine {
* @param path The DFS path to load from
* @param parMin Minimum parallelism after load (equivalent to #par(min=...)).
*/
- def drmFromHDFS(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
+ def drmDfsRead(path: String, parMin: Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_]
/** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
def drmParallelizeWithRowIndices(m: Matrix, numPartitions: Int = 1)
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
index 97fe989..b9c50b0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/DrmLike.scala
@@ -17,6 +17,8 @@
package org.apache.mahout.math.drm
+import scala.reflect.ClassTag
+
/**
*
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
index 02e8b7a..b787ec0 100644
--- a/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
+++ b/math-scala/src/main/scala/org/apache/mahout/math/drm/package.scala
@@ -49,7 +49,7 @@ package object drm {
def drmBroadcast(v:Vector)(implicit ctx:DistributedContext):BCast[Vector] = ctx.drmBroadcast(v)
/** Load DRM from hdfs (as in Mahout DRM format) */
- def drmFromHDFS (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmFromHDFS(path)
+ def drmDfsRead (path: String)(implicit ctx: DistributedContext): CheckpointedDrm[_] = ctx.drmDfsRead(path)
/** Shortcut to parallelizing matrices with indices, ignore row labels. */
def drmParallelize(m: Matrix, numPartitions: Int = 1)
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
index 7a13124..6c9313c 100644
--- a/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
+++ b/math-scala/src/test/scala/org/apache/mahout/math/drm/DrmLikeSuiteBase.scala
@@ -23,6 +23,7 @@ import org.apache.mahout.math._
import scalabindings._
import RLikeOps._
import RLikeDrmOps._
+import scala.reflect.ClassTag
/** Common DRM tests to be run by all distributed engines. */
trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
@@ -35,12 +36,15 @@ trait DrmLikeSuiteBase extends DistributedMahoutSuite with Matchers {
val inCoreA = dense((1, 2, 3), (3, 4, 5))
val drmA = drmParallelize(inCoreA)
- drmA.writeDRM(path = uploadPath)
+ drmA.dfsWrite(path = uploadPath)
println(inCoreA)
// Load back from hdfs
- val drmB = drmFromHDFS(path = uploadPath)
+ val drmB = drmDfsRead(path = uploadPath)
+
+ // Make sure keys are correctly identified as ints
+ drmB.checkpoint(CacheHint.NONE).keyClassTag shouldBe ClassTag.Int
// Collect back into in-core
val inCoreB = drmB.collect
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 89ed1a7..3696eb5 100644
--- a/pom.xml
+++ b/pom.xml
@@ -701,7 +701,7 @@
<module>math-scala</module>
<module>spark</module>
<module>spark-shell</module>
- <module>h2o</module>
+ <!--module>h2o</module -->
</modules>
<profiles>
<profile>
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
----------------------------------------------------------------------
diff --git a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
index 0df42a3..107fb1e 100644
--- a/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
+++ b/spark-shell/src/main/scala/org/apache/mahout/sparkbindings/shell/MahoutSparkILoop.scala
@@ -45,6 +45,9 @@ class MahoutSparkILoop extends SparkILoop {
conf.set("spark.executor.uri", execUri)
}
+ // TODO:XXX remove this beforre pushing to apache/master
+ conf.set("spark.kryoserializer.buffer.mb", "100")
+
sparkContext = mahoutSparkContext(
masterUrl = master,
appName = "Mahout Spark Shell",
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala b/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
new file mode 100644
index 0000000..5bbccb1
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/DrmMetadata.scala
@@ -0,0 +1,56 @@
+package org.apache.mahout.common
+
+import scala.reflect.ClassTag
+import org.apache.hadoop.io._
+import java.util.Arrays
+
+class DrmMetadata(
+
+ /** Writable key type as a sub-type of Writable */
+ val keyTypeWritable: Class[_],
+
+ /** Value writable type, as a sub-type of Writable */
+ val valueTypeWritable: Class[_]
+
+ ) {
+
+ import DrmMetadata._
+
+ val (
+
+ /** Actual drm key class tag once converted out of writable */
+ keyClassTag: ClassTag[_],
+
+ /** Conversion from Writable to value type of the DRM key */
+ keyW2ValFunc: ((Writable) => Any)
+
+ ) = keyTypeWritable match {
+ case cz if (cz == classOf[IntWritable]) => ClassTag.Int -> w2int _
+ case cz if (cz == classOf[LongWritable]) => ClassTag.Long -> w2long _
+ case cz if (cz == classOf[DoubleWritable]) => ClassTag.Double -> w2double _
+ case cz if (cz == classOf[FloatWritable]) => ClassTag.Float -> w2float _
+ case cz if (cz == classOf[Text]) => ClassTag(classOf[String]) -> w2string _
+ case cz if (cz == classOf[BooleanWritable]) => ClassTag(classOf[Boolean]) -> w2bool _
+ case cz if (cz == classOf[BytesWritable]) => ClassTag(classOf[Array[Byte]]) -> w2bytes _
+ case _ => throw new IllegalArgumentException(s"Unsupported DRM key type:${keyTypeWritable.getName}")
+ }
+
+}
+
+object DrmMetadata {
+
+ private[common] def w2int(w: Writable) = w.asInstanceOf[IntWritable].get()
+
+ private[common] def w2long(w: Writable) = w.asInstanceOf[LongWritable].get()
+
+ private[common] def w2double(w: Writable) = w.asInstanceOf[DoubleWritable].get()
+
+ private[common] def w2float(w: Writable) = w.asInstanceOf[FloatWritable].get()
+
+ private[common] def w2string(w: Writable) = w.asInstanceOf[Text].toString()
+
+ private[common] def w2bool(w: Writable) = w.asInstanceOf[BooleanWritable].get()
+
+ private[common] def w2bytes(w: Writable) = Arrays.copyOf(w.asInstanceOf[BytesWritable].getBytes(),
+ w.asInstanceOf[BytesWritable].getLength())
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
new file mode 100644
index 0000000..f5f87d7
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/HDFSUtil.scala
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+/** High level Hadoop version-specific hdfs manipulations we need in context of our operations. */
+trait HDFSUtil {
+
+ /** Read DRM header information off (H)DFS. */
+ def readDrmHeader(path:String):DrmMetadata
+}
+
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
new file mode 100644
index 0000000..87977ff
--- /dev/null
+++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common
+
+import org.apache.hadoop.io.{Writable, SequenceFile}
+import org.apache.hadoop.fs.{FileSystem, Path}
+import org.apache.hadoop.conf.Configuration
+import collection._
+import JavaConversions._
+
+/**
+ * Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
+ * with Hadoop 2.0
+ */
+object Hadoop1HDFSUtil extends HDFSUtil {
+
+
+ def readDrmHeader(path: String): DrmMetadata = {
+ val dfsPath = new Path(path)
+ val fs = dfsPath.getFileSystem(new Configuration())
+
+ val partFilePath:Path = fs.listStatus(dfsPath)
+
+ // Filter out anything starting with .
+ .filter { s => (!s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir)}
+
+ // Take path
+ .map(_.getPath)
+
+ // Take only one, if any
+ .headOption
+
+ // Require there's at least one partition file found.
+ .getOrElse {
+ throw new IllegalArgumentException(s"No partition files found in ${dfsPath.toString}.")
+ }
+
+ val reader = new SequenceFile.Reader(fs, partFilePath, fs.getConf)
+ try {
+ new DrmMetadata(
+ keyTypeWritable = reader.getKeyClass.asSubclass(classOf[Writable]),
+ valueTypeWritable = reader.getValueClass.asSubclass(classOf[Writable])
+ )
+ } finally {
+ reader.close()
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
index 54f33ef..08b2c34 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/SparkEngine.scala
@@ -17,7 +17,10 @@
package org.apache.mahout.sparkbindings
+import java.io.IOException
+
import org.apache.mahout.math._
+import org.apache.spark.deploy.SparkHadoopUtil
import scalabindings._
import RLikeOps._
import org.apache.mahout.math.drm.logical._
@@ -26,17 +29,20 @@ import org.apache.mahout.math._
import scala.reflect.ClassTag
import org.apache.spark.storage.StorageLevel
import org.apache.mahout.sparkbindings.blas._
-import org.apache.hadoop.io.{LongWritable, Text, IntWritable, Writable}
+import org.apache.hadoop.io._
import scala.Some
import scala.collection.JavaConversions._
-import org.apache.spark.SparkContext
import org.apache.mahout.math.drm._
import org.apache.mahout.math.drm.RLikeDrmOps._
import org.apache.spark.rdd.RDD
+import org.apache.mahout.common.{Hadoop1HDFSUtil, HDFSUtil}
/** Spark-specific non-drm-method operations */
object SparkEngine extends DistributedEngine {
+ // By default, use Hadoop 1 utils
+ var hdfsUtils: HDFSUtil = Hadoop1HDFSUtil
+
def colSums[K:ClassTag](drm: CheckpointedDrm[K]): Vector = {
val n = drm.ncol
@@ -125,47 +131,20 @@ object SparkEngine extends DistributedEngine {
*
* @return DRM[Any] where Any is automatically translated to value type
*/
- def drmFromHDFS (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
-
- val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)
- // Get rid of VectorWritable
- .map(t => (t._1, t._2.get()))
-
- def getKeyClassTag[K: ClassTag, V](rdd: RDD[(K, V)]) = implicitly[ClassTag[K]]
-
- // Spark should've loaded the type info from the header, right?
- val keyTag = getKeyClassTag(rdd)
-
- val (key2valFunc, val2keyFunc, unwrappedKeyTag) = keyTag match {
+ def drmDfsRead (path: String, parMin:Int = 0)(implicit sc: DistributedContext): CheckpointedDrm[_] = {
- case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[IntWritable]]) => (
- (v: AnyRef) => v.asInstanceOf[IntWritable].get,
- (x: Any) => new IntWritable(x.asInstanceOf[Int]),
- implicitly[ClassTag[Int]])
+ val drmMetadata = hdfsUtils.readDrmHeader(path)
+ val k2vFunc = drmMetadata.keyW2ValFunc
- case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[Text]]) => (
- (v: AnyRef) => v.asInstanceOf[Text].toString,
- (x: Any) => new Text(x.toString),
- implicitly[ClassTag[String]])
-
- case xx: ClassTag[Writable] if (xx == implicitly[ClassTag[LongWritable]]) => (
- (v: AnyRef) => v.asInstanceOf[LongWritable].get,
- (x: Any) => new LongWritable(x.asInstanceOf[Int]),
- implicitly[ClassTag[Long]])
-
- case xx: ClassTag[Writable] => (
- (v: AnyRef) => v,
- (x: Any) => x.asInstanceOf[Writable],
- ClassTag(classOf[Writable]))
- }
-
- {
- implicit def getWritable(x: Any): Writable = val2keyFunc()
+ // Load RDD and convert all Writables to value types right away (due to reuse of writables in
+ // Hadoop we must do it right after read operation).
+ val rdd = sc.sequenceFile(path, classOf[Writable], classOf[VectorWritable], minPartitions = parMin)
- val drmRdd = rdd.map { t => (key2valFunc(t._1), t._2)}
+ // Immediately convert keys and value writables into value types.
+ .map { case (wKey, wVec) => k2vFunc(wKey) -> wVec.get()}
- drmWrap(rdd = drmRdd, cacheHint = CacheHint.MEMORY_ONLY)(unwrappedKeyTag.asInstanceOf[ClassTag[Any]])
- }
+ // Wrap into a DRM type with correct matrix row key class tag evident.
+ drmWrap(rdd = rdd, cacheHint = CacheHint.NONE)(drmMetadata.keyClassTag.asInstanceOf[ClassTag[Any]])
}
/** Parallelize in-core matrix as spark distributed matrix, using row ordinal indices as data set keys. */
http://git-wip-us.apache.org/repos/asf/mahout/blob/c34e8a84/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
index cc5ebf2..b753f6f 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/drm/CheckpointedDrmSpark.scala
@@ -65,6 +65,9 @@ class CheckpointedDrmSpark[K: ClassTag](
private var cached: Boolean = false
override val context: DistributedContext = rdd.context
+ /** Explicit extraction of key class Tag */
+ def keyClassTag: ClassTag[K] = implicitly[ClassTag[K]]
+
/**
* Action operator -- does not necessary means Spark action; but does mean running BLAS optimizer
* and writing down Spark graph lineage since last checkpointed DRM.
@@ -152,7 +155,7 @@ class CheckpointedDrmSpark[K: ClassTag](
* Dump matrix as computed Mahout's DRM into specified (HD)FS path
* @param path
*/
- def writeDRM(path: String) = {
+ def dfsWrite(path: String) = {
val ktag = implicitly[ClassTag[K]]
implicit val k2wFunc: (K) => Writable =