You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mahout.apache.org by sm...@apache.org on 2015/10/24 00:26:14 UTC
mahout git commit: MAHOUT-1776 Refactor common Engine agnostic
classes to Math-Scala module closes apache/mahout#163
Repository: mahout
Updated Branches:
refs/heads/flink-binding 7039f4c5f -> f5a4a9762
MAHOUT-1776 Refactor common Engine agnostic classes to Math-Scala module closes apache/mahout#163
Project: http://git-wip-us.apache.org/repos/asf/mahout/repo
Commit: http://git-wip-us.apache.org/repos/asf/mahout/commit/f5a4a976
Tree: http://git-wip-us.apache.org/repos/asf/mahout/tree/f5a4a976
Diff: http://git-wip-us.apache.org/repos/asf/mahout/diff/f5a4a976
Branch: refs/heads/flink-binding
Commit: f5a4a976288a3ec10942f1b28ea793bacac33955
Parents: 7039f4c
Author: smarthi <sm...@apache.org>
Authored: Fri Oct 23 18:23:43 2015 -0400
Committer: smarthi <sm...@apache.org>
Committed: Fri Oct 23 18:23:43 2015 -0400
----------------------------------------------------------------------
math-scala/pom.xml | 6 +
.../common/io/GenericMatrixKryoSerializer.scala | 188 ++++++++++++++
.../mahout/common/io/VectorKryoSerializer.scala | 248 ++++++++++++++++++
.../apache/mahout/common/Hadoop1HDFSUtil.scala | 8 +-
.../io/GenericMatrixKryoSerializer.scala | 189 --------------
.../io/MahoutKryoRegistrator.scala | 1 +
.../sparkbindings/io/VectorKryoSerializer.scala | 252 -------------------
.../sparkbindings/SparkBindingsSuite.scala | 6 +-
.../mahout/sparkbindings/io/IOSuite.scala | 6 +-
9 files changed, 452 insertions(+), 452 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/math-scala/pom.xml
----------------------------------------------------------------------
diff --git a/math-scala/pom.xml b/math-scala/pom.xml
index e8c0357..0124612 100644
--- a/math-scala/pom.xml
+++ b/math-scala/pom.xml
@@ -122,6 +122,12 @@
<artifactId>mahout-math</artifactId>
</dependency>
+ <dependency>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ <version>2.21</version>
+ </dependency>
+
<!-- 3rd-party -->
<dependency>
<groupId>log4j</groupId>
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/math-scala/src/main/scala/org/apache/mahout/common/io/GenericMatrixKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/common/io/GenericMatrixKryoSerializer.scala b/math-scala/src/main/scala/org/apache/mahout/common/io/GenericMatrixKryoSerializer.scala
new file mode 100644
index 0000000..534d37c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/common/io/GenericMatrixKryoSerializer.scala
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.io
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer}
+import org.apache.log4j.Logger
+import org.apache.mahout.logging._
+import org.apache.mahout.math._
+import org.apache.mahout.math.flavor.TraversingStructureEnum
+import org.apache.mahout.math.scalabindings.RLikeOps._
+import org.apache.mahout.math.scalabindings._
+
+import scala.collection.JavaConversions._
+
+object GenericMatrixKryoSerializer {
+
+ private implicit final val log = Logger.getLogger(classOf[GenericMatrixKryoSerializer])
+
+}
+
+/** Serializes Sparse or Dense in-core generic matrix (row-wise or column-wise backed) */
+class GenericMatrixKryoSerializer extends Serializer[Matrix] {
+
+ import GenericMatrixKryoSerializer._
+
+ override def write(kryo: Kryo, output: Output, mx: Matrix): Unit = {
+
+ debug(s"Writing mx of type ${mx.getClass.getName}")
+
+ val structure = mx.getFlavor.getStructure
+
+ // Write structure bit
+ output.writeInt(structure.ordinal(), true)
+
+ // Write geometry
+ output.writeInt(mx.nrow, true)
+ output.writeInt(mx.ncol, true)
+
+ // Write in most efficient traversal order (using backing vectors perhaps)
+ structure match {
+ case TraversingStructureEnum.COLWISE => writeRowWise(kryo, output, mx.t)
+ case TraversingStructureEnum.SPARSECOLWISE => writeSparseRowWise(kryo, output, mx.t)
+ case TraversingStructureEnum.SPARSEROWWISE => writeSparseRowWise(kryo, output, mx)
+ case TraversingStructureEnum.VECTORBACKED => writeVectorBacked(kryo, output, mx)
+ case _ => writeRowWise(kryo, output, mx)
+ }
+
+ }
+
+ private def writeVectorBacked(kryo: Kryo, output: Output, mx: Matrix) {
+
+ require(mx != null)
+
+ // At this point we are just doing some vector-backed classes individually. TODO: create
+ // api to obtain vector-backed matrix data.
+ kryo.writeClass(output, mx.getClass)
+ mx match {
+ case mxD: DiagonalMatrix => kryo.writeObject(output, mxD.diagv)
+ case mxS: DenseSymmetricMatrix => kryo.writeObject(output, dvec(mxS.getData))
+ case mxT: UpperTriangular => kryo.writeObject(output, dvec(mxT.getData))
+ case _ => throw new IllegalArgumentException(s"Unsupported matrix type:${mx.getClass.getName}")
+ }
+ }
+
+ private def readVectorBacked(kryo: Kryo, input: Input, nrow: Int, ncol: Int) = {
+
+ // We require vector-backed matrices to have vector-parameterized constructor to construct.
+ val clazz = kryo.readClass(input).getType
+
+ debug(s"Deserializing vector-backed mx of type ${clazz.getName}.")
+
+ clazz.getConstructor(classOf[Vector]).newInstance(kryo.readObject(input, classOf[Vector])).asInstanceOf[Matrix]
+ }
+
+ private def writeRowWise(kryo: Kryo, output: Output, mx: Matrix): Unit = {
+ for (row <- mx) kryo.writeObject(output, row)
+ }
+
+ private def readRows(kryo: Kryo, input: Input, nrow: Int) = {
+ Array.tabulate(nrow) { _ => kryo.readObject(input, classOf[Vector])}
+ }
+
+ private def readSparseRows(kryo: Kryo, input: Input) = {
+
+ // Number of slices
+ val nslices = input.readInt(true)
+
+ Array.tabulate(nslices) { _ =>
+ input.readInt(true) -> kryo.readObject(input, classOf[Vector])
+ }
+ }
+
+ private def writeSparseRowWise(kryo: Kryo, output: Output, mx: Matrix): Unit = {
+
+ val nslices = mx.numSlices()
+
+ output.writeInt(nslices, true)
+
+ var actualNSlices = 0
+ for (row <- mx.iterateNonEmpty()) {
+ output.writeInt(row.index(), true)
+ kryo.writeObject(output, row.vector())
+ actualNSlices += 1
+ }
+
+ require(nslices == actualNSlices, "Number of slices reported by Matrix.numSlices() was different from actual " +
+ "slice iterator size.")
+ }
+
+ override def read(kryo: Kryo, input: Input, mxClass: Class[Matrix]): Matrix = {
+
+ // Read structure hint
+ val structure = TraversingStructureEnum.values()(input.readInt(true))
+
+ // Read geometry
+ val nrow = input.readInt(true)
+ val ncol = input.readInt(true)
+
+ debug(s"read matrix geometry: $nrow x $ncol.")
+
+ structure match {
+
+ // Sparse or dense column wise
+ case TraversingStructureEnum.COLWISE =>
+ val cols = readRows(kryo, input, ncol)
+
+ if (!cols.isEmpty && cols.head.isDense)
+ dense(cols).t
+ else {
+ debug("Deserializing as SparseRowMatrix.t (COLWISE).")
+ new SparseRowMatrix(ncol, nrow, cols, true, false).t
+ }
+
+ // transposed SparseMatrix case
+ case TraversingStructureEnum.SPARSECOLWISE =>
+ val cols = readSparseRows(kryo, input)
+ val javamap = new java.util.HashMap[Integer, Vector]((cols.size << 1) + 1)
+ cols.foreach { case (idx, vec) => javamap.put(idx, vec)}
+
+ debug("Deserializing as SparseMatrix.t (SPARSECOLWISE).")
+ new SparseMatrix(ncol, nrow, javamap, true).t
+
+ // Sparse Row-wise -- this will be created as a SparseMatrix.
+ case TraversingStructureEnum.SPARSEROWWISE =>
+ val rows = readSparseRows(kryo, input)
+ val javamap = new java.util.HashMap[Integer, Vector]((rows.size << 1) + 1)
+ rows.foreach { case (idx, vec) => javamap.put(idx, vec)}
+
+ debug("Deserializing as SparseMatrix (SPARSEROWWISE).")
+ new SparseMatrix(nrow, ncol, javamap, true)
+ case TraversingStructureEnum.VECTORBACKED =>
+
+ debug("Deserializing vector-backed...")
+ readVectorBacked(kryo, input, nrow, ncol)
+
+ // By default, read row-wise.
+ case _ =>
+ val cols = readRows(kryo, input, nrow)
+ // this still copies a lot of stuff...
+ if (!cols.isEmpty && cols.head.isDense) {
+
+ debug("Deserializing as DenseMatrix.")
+ dense(cols)
+ } else {
+
+ debug("Deserializing as SparseRowMatrix(default).")
+ new SparseRowMatrix(nrow, ncol, cols, true, false)
+ }
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/math-scala/src/main/scala/org/apache/mahout/common/io/VectorKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/math-scala/src/main/scala/org/apache/mahout/common/io/VectorKryoSerializer.scala b/math-scala/src/main/scala/org/apache/mahout/common/io/VectorKryoSerializer.scala
new file mode 100644
index 0000000..3cc537c
--- /dev/null
+++ b/math-scala/src/main/scala/org/apache/mahout/common/io/VectorKryoSerializer.scala
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.mahout.common.io
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Kryo, Serializer}
+import org.apache.mahout.logging._
+import org.apache.mahout.math._
+import org.apache.mahout.math.scalabindings.RLikeOps._
+
+import scala.collection.JavaConversions._
+
+
+object VectorKryoSerializer {
+
+ final val FLAG_DENSE: Int = 0x01
+ final val FLAG_SEQUENTIAL: Int = 0x02
+ final val FLAG_NAMED: Int = 0x04
+ final val FLAG_LAX_PRECISION: Int = 0x08
+
+ private final implicit val log = getLog(classOf[VectorKryoSerializer])
+
+}
+
+class VectorKryoSerializer(val laxPrecision: Boolean = false) extends Serializer[Vector] {
+
+ import VectorKryoSerializer._
+
+ override def write(kryo: Kryo, output: Output, vector: Vector): Unit = {
+
+ require(vector != null)
+
+ trace(s"Serializing vector of ${vector.getClass.getName} class.")
+
+ // Write length
+ val len = vector.length
+ output.writeInt(len, true)
+
+ // Interrogate vec properties
+ val dense = vector.isDense
+ val sequential = vector.isSequentialAccess
+ val named = vector.isInstanceOf[NamedVector]
+
+ var flag = 0
+
+ if (dense) {
+ flag |= FLAG_DENSE
+ } else if (sequential) {
+ flag |= FLAG_SEQUENTIAL
+ }
+
+ if (vector.isInstanceOf[NamedVector]) {
+ flag |= FLAG_NAMED
+ }
+
+ if (laxPrecision) flag |= FLAG_LAX_PRECISION
+
+ // Write flags
+ output.writeByte(flag)
+
+ // Write name if needed
+ if (named) output.writeString(vector.asInstanceOf[NamedVector].getName)
+
+ dense match {
+
+ // Dense vector.
+ case true =>
+
+ laxPrecision match {
+ case true =>
+ for (i <- 0 until vector.length) output.writeFloat(vector(i).toFloat)
+ case _ =>
+ for (i <- 0 until vector.length) output.writeDouble(vector(i))
+ }
+ case _ =>
+
+ // Turns out getNumNonZeroElements must check every element if it is indeed non-zero. The
+ // iterateNonZeros() on the other hand doesn't do that, so that's all inconsistent right
+ // now. so we'll just auto-terminate.
+ val iter = vector.nonZeroes.toIterator.filter(_.get() != 0.0)
+
+ sequential match {
+
+ // Delta encoding
+ case true =>
+
+ var idx = 0
+ laxPrecision match {
+ case true =>
+ while (iter.hasNext) {
+ val el = iter.next()
+ output.writeFloat(el.toFloat)
+ output.writeInt(el.index() - idx, true)
+ idx = el.index
+ }
+ // Terminate delta encoding.
+ output.writeFloat(0.0.toFloat)
+ case _ =>
+ while (iter.hasNext) {
+ val el = iter.next()
+ output.writeDouble(el.get())
+ output.writeInt(el.index() - idx, true)
+ idx = el.index
+ }
+ // Terminate delta encoding.
+ output.writeDouble(0.0)
+ }
+
+ // Random access.
+ case _ =>
+
+ laxPrecision match {
+
+ case true =>
+ iter.foreach { el =>
+ output.writeFloat(el.get().toFloat)
+ output.writeInt(el.index(), true)
+ }
+ // Terminate random access with 0.0 value.
+ output.writeFloat(0.0.toFloat)
+ case _ =>
+ iter.foreach { el =>
+ output.writeDouble(el.get())
+ output.writeInt(el.index(), true)
+ }
+ // Terminate random access with 0.0 value.
+ output.writeDouble(0.0)
+ }
+
+ }
+
+ }
+ }
+
+ override def read(kryo: Kryo, input: Input, vecClass: Class[Vector]): Vector = {
+
+ val len = input.readInt(true)
+ val flags = input.readByte().toInt
+ val name = if ((flags & FLAG_NAMED) != 0) Some(input.readString()) else None
+
+ val vec: Vector = flags match {
+
+ // Dense
+ case _: Int if (flags & FLAG_DENSE) != 0 =>
+
+ trace(s"Deserializing dense vector.")
+
+ if ((flags & FLAG_LAX_PRECISION) != 0) {
+ new DenseVector(len) := { _ => input.readFloat()}
+ } else {
+ new DenseVector(len) := { _ => input.readDouble()}
+ }
+
+ // Sparse case.
+ case _ =>
+
+ flags match {
+
+ // Sequential.
+ case _: Int if (flags & FLAG_SEQUENTIAL) != 0 =>
+
+ trace("Deserializing as sequential sparse vector.")
+
+ val v = new SequentialAccessSparseVector(len)
+ var idx = 0
+ var stop = false
+
+ if ((flags & FLAG_LAX_PRECISION) != 0) {
+
+ while (!stop) {
+ val value = input.readFloat()
+ if (value == 0.0) {
+ stop = true
+ } else {
+ idx += input.readInt(true)
+ v(idx) = value
+ }
+ }
+ } else {
+ while (!stop) {
+ val value = input.readDouble()
+ if (value == 0.0) {
+ stop = true
+ } else {
+ idx += input.readInt(true)
+ v(idx) = value
+ }
+ }
+ }
+ v
+
+ // Random access
+ case _ =>
+
+ trace("Deserializing as random access vector.")
+
+ // Read pairs until we see 0.0 value. Prone to corruption attacks obviously.
+ val v = new RandomAccessSparseVector(len)
+ var stop = false
+ if ((flags & FLAG_LAX_PRECISION) != 0) {
+ while (! stop ) {
+ val value = input.readFloat()
+ if ( value == 0.0 ) {
+ stop = true
+ } else {
+ val idx = input.readInt(true)
+ v(idx) = value
+ }
+ }
+ } else {
+ while (! stop ) {
+ val value = input.readDouble()
+ if (value == 0.0) {
+ stop = true
+ } else {
+ val idx = input.readInt(true)
+ v(idx) = value
+ }
+ }
+ }
+ v
+ }
+ }
+
+ name.map{name =>
+
+ trace(s"Recovering named vector's name $name.")
+
+ new NamedVector(vec, name)
+ }
+ .getOrElse(vec)
+ }
+}
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
index 399508d..29599b8 100644
--- a/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
+++ b/spark/src/main/scala/org/apache/mahout/common/Hadoop1HDFSUtil.scala
@@ -17,12 +17,10 @@
package org.apache.mahout.common
-import org.apache.hadoop.io.{Writable, SequenceFile}
-import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.{SequenceFile, Writable}
import org.apache.spark.SparkContext
-import collection._
-import JavaConversions._
/**
* Deprecated Hadoop 1 api which we currently explicitly import via Mahout dependencies. May not work
@@ -44,7 +42,7 @@ object Hadoop1HDFSUtil extends HDFSUtil {
val partFilePath:Path = fs.listStatus(dfsPath)
// Filter out anything starting with .
- .filter { s => (!s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir)}
+ .filter { s => !s.getPath.getName.startsWith("\\.") && !s.getPath.getName.startsWith("_") && !s.isDir }
// Take path
.map(_.getPath)
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/spark/src/main/scala/org/apache/mahout/sparkbindings/io/GenericMatrixKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/GenericMatrixKryoSerializer.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/GenericMatrixKryoSerializer.scala
deleted file mode 100644
index da58b35..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/GenericMatrixKryoSerializer.scala
+++ /dev/null
@@ -1,189 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.io
-
-
-import com.esotericsoftware.kryo.io.{Output, Input}
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import org.apache.log4j.Logger
-import org.apache.mahout.logging._
-import org.apache.mahout.math._
-import org.apache.mahout.math.flavor.TraversingStructureEnum
-import scalabindings._
-import RLikeOps._
-import collection._
-import JavaConversions._
-
-object GenericMatrixKryoSerializer {
-
- private implicit final val log = Logger.getLogger(classOf[GenericMatrixKryoSerializer])
-
-}
-
-/** Serializes Sparse or Dense in-core generic matrix (row-wise or column-wise backed) */
-class GenericMatrixKryoSerializer extends Serializer[Matrix] {
-
- import GenericMatrixKryoSerializer._
-
- override def write(kryo: Kryo, output: Output, mx: Matrix): Unit = {
-
- debug(s"Writing mx of type ${mx.getClass.getName}")
-
- val structure = mx.getFlavor.getStructure
-
- // Write structure bit
- output.writeInt(structure.ordinal(), true)
-
- // Write geometry
- output.writeInt(mx.nrow, true)
- output.writeInt(mx.ncol, true)
-
- // Write in most efficient traversal order (using backing vectors perhaps)
- structure match {
- case TraversingStructureEnum.COLWISE => writeRowWise(kryo, output, mx.t)
- case TraversingStructureEnum.SPARSECOLWISE => writeSparseRowWise(kryo, output, mx.t)
- case TraversingStructureEnum.SPARSEROWWISE => writeSparseRowWise(kryo, output, mx)
- case TraversingStructureEnum.VECTORBACKED => writeVectorBacked(kryo, output, mx)
- case _ => writeRowWise(kryo, output, mx)
- }
-
- }
-
- private def writeVectorBacked(kryo: Kryo, output: Output, mx: Matrix) {
-
- require(mx != null)
-
- // At this point we are just doing some vector-backed classes individually. TODO: create
- // api to obtain vector-backed matrix data.
- kryo.writeClass(output, mx.getClass)
- mx match {
- case mxD: DiagonalMatrix => kryo.writeObject(output, mxD.diagv)
- case mxS: DenseSymmetricMatrix => kryo.writeObject(output, dvec(mxS.getData))
- case mxT: UpperTriangular => kryo.writeObject(output, dvec(mxT.getData))
- case _ => throw new IllegalArgumentException(s"Unsupported matrix type:${mx.getClass.getName}")
- }
- }
-
- private def readVectorBacked(kryo: Kryo, input: Input, nrow: Int, ncol: Int) = {
-
- // We require vector-backed matrices to have vector-parameterized constructor to construct.
- val clazz = kryo.readClass(input).getType
-
- debug(s"Deserializing vector-backed mx of type ${clazz.getName}.")
-
- clazz.getConstructor(classOf[Vector]).newInstance(kryo.readObject(input, classOf[Vector])).asInstanceOf[Matrix]
- }
-
- private def writeRowWise(kryo: Kryo, output: Output, mx: Matrix): Unit = {
- for (row <- mx) kryo.writeObject(output, row)
- }
-
- private def readRows(kryo: Kryo, input: Input, nrow: Int) = {
- Array.tabulate(nrow) { _ => kryo.readObject(input, classOf[Vector])}
- }
-
- private def readSparseRows(kryo: Kryo, input: Input) = {
-
- // Number of slices
- val nslices = input.readInt(true)
-
- Array.tabulate(nslices) { _ =>
- input.readInt(true) -> kryo.readObject(input, classOf[Vector])
- }
- }
-
- private def writeSparseRowWise(kryo: Kryo, output: Output, mx: Matrix): Unit = {
-
- val nslices = mx.numSlices()
-
- output.writeInt(nslices, true)
-
- var actualNSlices = 0;
- for (row <- mx.iterateNonEmpty()) {
- output.writeInt(row.index(), true)
- kryo.writeObject(output, row.vector())
- actualNSlices += 1
- }
-
- require(nslices == actualNSlices, "Number of slices reported by Matrix.numSlices() was different from actual " +
- "slice iterator size.")
- }
-
- override def read(kryo: Kryo, input: Input, mxClass: Class[Matrix]): Matrix = {
-
- // Read structure hint
- val structure = TraversingStructureEnum.values()(input.readInt(true))
-
- // Read geometry
- val nrow = input.readInt(true)
- val ncol = input.readInt(true)
-
- debug(s"read matrix geometry: $nrow x $ncol.")
-
- structure match {
-
- // Sparse or dense column wise
- case TraversingStructureEnum.COLWISE =>
- val cols = readRows(kryo, input, ncol)
-
- if (!cols.isEmpty && cols.head.isDense)
- dense(cols).t
- else {
- debug("Deserializing as SparseRowMatrix.t (COLWISE).")
- new SparseRowMatrix(ncol, nrow, cols, true, false).t
- }
-
- // transposed SparseMatrix case
- case TraversingStructureEnum.SPARSECOLWISE =>
- val cols = readSparseRows(kryo, input)
- val javamap = new java.util.HashMap[Integer, Vector]((cols.size << 1) + 1)
- cols.foreach { case (idx, vec) => javamap.put(idx, vec)}
-
- debug("Deserializing as SparseMatrix.t (SPARSECOLWISE).")
- new SparseMatrix(ncol, nrow, javamap, true).t
-
- // Sparse Row-wise -- this will be created as a SparseMatrix.
- case TraversingStructureEnum.SPARSEROWWISE =>
- val rows = readSparseRows(kryo, input)
- val javamap = new java.util.HashMap[Integer, Vector]((rows.size << 1) + 1)
- rows.foreach { case (idx, vec) => javamap.put(idx, vec)}
-
- debug("Deserializing as SparseMatrix (SPARSEROWWISE).")
- new SparseMatrix(nrow, ncol, javamap, true)
- case TraversingStructureEnum.VECTORBACKED =>
-
- debug("Deserializing vector-backed...")
- readVectorBacked(kryo, input, nrow, ncol)
-
- // By default, read row-wise.
- case _ =>
- val cols = readRows(kryo, input, nrow)
- // this still copies a lot of stuff...
- if (!cols.isEmpty && cols.head.isDense) {
-
- debug("Deserializing as DenseMatrix.")
- dense(cols)
- } else {
-
- debug("Deserializing as SparseRowMatrix(default).")
- new SparseRowMatrix(nrow, ncol, cols, true, false)
- }
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
index 5806ff5..4e0e061 100644
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
+++ b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/MahoutKryoRegistrator.scala
@@ -18,6 +18,7 @@
package org.apache.mahout.sparkbindings.io
import com.esotericsoftware.kryo.Kryo
+import org.apache.mahout.common.io.{VectorKryoSerializer, GenericMatrixKryoSerializer}
import org.apache.mahout.math._
import org.apache.spark.serializer.KryoRegistrator
import org.apache.mahout.logging._
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/spark/src/main/scala/org/apache/mahout/sparkbindings/io/VectorKryoSerializer.scala
----------------------------------------------------------------------
diff --git a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/VectorKryoSerializer.scala b/spark/src/main/scala/org/apache/mahout/sparkbindings/io/VectorKryoSerializer.scala
deleted file mode 100644
index 175778f..0000000
--- a/spark/src/main/scala/org/apache/mahout/sparkbindings/io/VectorKryoSerializer.scala
+++ /dev/null
@@ -1,252 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.mahout.sparkbindings.io
-
-import org.apache.log4j.Logger
-import org.apache.mahout.logging._
-import org.apache.mahout.math._
-import org.apache.mahout.math.scalabindings._
-import RLikeOps._
-
-import com.esotericsoftware.kryo.io.{OutputChunked, Output, Input}
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-
-import collection._
-import JavaConversions._
-
-
-object VectorKryoSerializer {
-
- final val FLAG_DENSE: Int = 0x01
- final val FLAG_SEQUENTIAL: Int = 0x02
- final val FLAG_NAMED: Int = 0x04
- final val FLAG_LAX_PRECISION: Int = 0x08
-
- private final implicit val log = getLog(classOf[VectorKryoSerializer])
-
-}
-
-class VectorKryoSerializer(val laxPrecision: Boolean = false) extends Serializer[Vector] {
-
- import VectorKryoSerializer._
-
- override def write(kryo: Kryo, output: Output, vector: Vector): Unit = {
-
- require(vector != null)
-
- trace(s"Serializing vector of ${vector.getClass.getName} class.")
-
- // Write length
- val len = vector.length
- output.writeInt(len, true)
-
- // Interrogate vec properties
- val dense = vector.isDense
- val sequential = vector.isSequentialAccess
- val named = vector.isInstanceOf[NamedVector]
-
- var flag = 0
-
- if (dense) {
- flag |= FLAG_DENSE
- } else if (sequential) {
- flag |= FLAG_SEQUENTIAL
- }
-
- if (vector.isInstanceOf[NamedVector]) {
- flag |= FLAG_NAMED
- }
-
- if (laxPrecision) flag |= FLAG_LAX_PRECISION
-
- // Write flags
- output.writeByte(flag)
-
- // Write name if needed
- if (named) output.writeString(vector.asInstanceOf[NamedVector].getName)
-
- dense match {
-
- // Dense vector.
- case true =>
-
- laxPrecision match {
- case true =>
- for (i <- 0 until vector.length) output.writeFloat(vector(i).toFloat)
- case _ =>
- for (i <- 0 until vector.length) output.writeDouble(vector(i))
- }
- case _ =>
-
- // Turns out getNumNonZeroElements must check every element if it is indeed non-zero. The
- // iterateNonZeros() on the other hand doesn't do that, so that's all inconsistent right
- // now. so we'll just auto-terminate.
- val iter = vector.nonZeroes.toIterator.filter(_.get() != 0.0)
-
- sequential match {
-
- // Delta encoding
- case true =>
-
- var idx = 0
- laxPrecision match {
- case true =>
- while (iter.hasNext) {
- val el = iter.next()
- output.writeFloat(el.toFloat)
- output.writeInt(el.index() - idx, true)
- idx = el.index
- }
- // Terminate delta encoding.
- output.writeFloat(0.0.toFloat)
- case _ =>
- while (iter.hasNext) {
- val el = iter.next()
- output.writeDouble(el.get())
- output.writeInt(el.index() - idx, true)
- idx = el.index
- }
- // Terminate delta encoding.
- output.writeDouble(0.0)
- }
-
- // Random access.
- case _ =>
-
- laxPrecision match {
-
- case true =>
- iter.foreach { el =>
- output.writeFloat(el.get().toFloat)
- output.writeInt(el.index(), true)
- }
- // Terminate random access with 0.0 value.
- output.writeFloat(0.0.toFloat)
- case _ =>
- iter.foreach { el =>
- output.writeDouble(el.get())
- output.writeInt(el.index(), true)
- }
- // Terminate random access with 0.0 value.
- output.writeDouble(0.0)
- }
-
- }
-
- }
- }
-
- override def read(kryo: Kryo, input: Input, vecClass: Class[Vector]): Vector = {
-
- val len = input.readInt(true)
- val flags = input.readByte().toInt
- val name = if ((flags & FLAG_NAMED) != 0) Some(input.readString()) else None
-
- val vec: Vector = flags match {
-
- // Dense
- case _: Int if ((flags & FLAG_DENSE) != 0) =>
-
- trace(s"Deserializing dense vector.")
-
- if ((flags & FLAG_LAX_PRECISION) != 0) {
- new DenseVector(len) := { _ => input.readFloat()}
- } else {
- new DenseVector(len) := { _ => input.readDouble()}
- }
-
- // Sparse case.
- case _ =>
-
- flags match {
-
- // Sequential.
- case _: Int if ((flags & FLAG_SEQUENTIAL) != 0) =>
-
- trace("Deserializing as sequential sparse vector.")
-
- val v = new SequentialAccessSparseVector(len)
- var idx = 0
- var stop = false
-
- if ((flags & FLAG_LAX_PRECISION) != 0) {
-
- while (!stop) {
- val value = input.readFloat()
- if (value == 0.0) {
- stop = true
- } else {
- idx += input.readInt(true)
- v(idx) = value
- }
- }
- } else {
- while (!stop) {
- val value = input.readDouble()
- if (value == 0.0) {
- stop = true
- } else {
- idx += input.readInt(true)
- v(idx) = value
- }
- }
- }
- v
-
- // Random access
- case _ =>
-
- trace("Deserializing as random access vector.")
-
- // Read pairs until we see 0.0 value. Prone to corruption attacks obviously.
- val v = new RandomAccessSparseVector(len)
- var stop = false
- if ((flags & FLAG_LAX_PRECISION) != 0) {
- while (! stop ) {
- val value = input.readFloat()
- if ( value == 0.0 ) {
- stop = true
- } else {
- val idx = input.readInt(true)
- v(idx) = value
- }
- }
- } else {
- while (! stop ) {
- val value = input.readDouble()
- if (value == 0.0) {
- stop = true
- } else {
- val idx = input.readInt(true)
- v(idx) = value
- }
- }
- }
- v
- }
- }
-
- name.map{name =>
-
- trace(s"Recovering named vector's name ${name}.")
-
- new NamedVector(vec, name)
- }
- .getOrElse(vec)
- }
-}
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
index 529d13c..61244a1 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/SparkBindingsSuite.scala
@@ -1,12 +1,12 @@
package org.apache.mahout.sparkbindings
import java.io.{Closeable, File}
-import java.util
import org.apache.mahout.sparkbindings.test.DistributedSparkSuite
import org.apache.mahout.util.IOUtilsScala
import org.scalatest.FunSuite
-import collection._
+
+import scala.collection._
/**
* @author dmitriy
@@ -14,7 +14,7 @@ import collection._
class SparkBindingsSuite extends FunSuite with DistributedSparkSuite {
// This test will succeed only when MAHOUT_HOME is set in the environment. So we keep it for
- // diagnorstic purposes around, but we probably don't want it to run in the Jenkins, so we'd
+ // diagnostic purposes around, but we probably don't want it to run in the Jenkins, so we'd
// let it to be ignored.
ignore("context jars") {
System.setProperty("mahout.home", new File("..").getAbsolutePath/*"/home/dmitriy/projects/github/mahout-commits"*/)
http://git-wip-us.apache.org/repos/asf/mahout/blob/f5a4a976/spark/src/test/scala/org/apache/mahout/sparkbindings/io/IOSuite.scala
----------------------------------------------------------------------
diff --git a/spark/src/test/scala/org/apache/mahout/sparkbindings/io/IOSuite.scala b/spark/src/test/scala/org/apache/mahout/sparkbindings/io/IOSuite.scala
index f3a9721..1814f17 100644
--- a/spark/src/test/scala/org/apache/mahout/sparkbindings/io/IOSuite.scala
+++ b/spark/src/test/scala/org/apache/mahout/sparkbindings/io/IOSuite.scala
@@ -116,11 +116,11 @@ class IOSuite extends FunSuite with MahoutSuite {
mxC(i, ::) := { _ => if (rnd.nextDouble() < .3) rnd.nextDouble() else 0.0}
val cnsl = mxC.numSlices()
- println(s"Number of slices in mxC: ${cnsl}")
+ println(s"Number of slices in mxC: $cnsl")
val ret = kryoClone(mxA, mxA.t, mxB, mxB.t, mxC, mxC.t, mxA)
- val (mxAA, mxAAt, mxBB, mxBBt, mxCC, mxCCt, mxAAA) = (ret(0), ret(1), ret(2), ret(3), ret(4), ret(5), ret(6))
+ val (mxAA, mxAAt, mxBB, mxBBt, mxCC, mxCCt, mxAAA) = (ret.head, ret(1), ret(2), ret(3), ret(4), ret(5), ret(6))
// ret.size shouldBe 7
@@ -163,7 +163,7 @@ class IOSuite extends FunSuite with MahoutSuite {
test("diag matrix") {
val mxD = diagv(dvec(1, 2, 3, 5))
- val mxDD = kryoClone(mxD)(0)
+ val mxDD = kryoClone(mxD).head
mxD === mxDD shouldBe true
mxDD.isInstanceOf[DiagonalMatrix] shouldBe true