You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/10 11:43:33 UTC
spark git commit: [SPARK-22450][CORE][MLLIB] safely register class
for mllib
Repository: spark
Updated Branches:
refs/heads/master 9b9827759 -> 1c923d7d6
[SPARK-22450][CORE][MLLIB] safely register class for mllib
## What changes were proposed in this pull request?
There are still some algorithms based on mllib, such as KMeans. For now, many mllib common class (such as: Vector, DenseVector, SparseVector, Matrix, DenseMatrix, SparseMatrix) are not registered in Kryo. So there are some performance issues for those object serialization or deserialization.
Previously dicussed: https://github.com/apache/spark/pull/19586
## How was this patch tested?
New test case.
Author: Xianyang Liu <xi...@intel.com>
Closes #19661 from ConeyLiu/register_vector.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c923d7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c923d7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c923d7d
Branch: refs/heads/master
Commit: 1c923d7d65dd94996f0fe2cf9851a1ae738c5c0c
Parents: 9b98277
Author: Xianyang Liu <xi...@intel.com>
Authored: Fri Nov 10 12:43:29 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Nov 10 12:43:29 2017 +0100
----------------------------------------------------------------------
.../spark/serializer/KryoSerializer.scala | 26 +++++++++++
.../apache/spark/ml/feature/InstanceSuit.scala | 47 ++++++++++++++++++++
.../spark/mllib/linalg/MatricesSuite.scala | 28 +++++++++++-
.../spark/mllib/linalg/VectorsSuite.scala | 19 +++++++-
4 files changed, 118 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 58483c9..2259d1a 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -25,6 +25,7 @@ import javax.annotation.Nullable
import scala.collection.JavaConverters._
import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag
+import scala.util.control.NonFatal
import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
@@ -178,6 +179,31 @@ class KryoSerializer(conf: SparkConf)
kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
kryo.register(classOf[ArrayBuffer[Any]])
+ // We can't load those class directly in order to avoid unnecessary jar dependencies.
+ // We load them safely, ignore it if the class not found.
+ Seq("org.apache.spark.mllib.linalg.Vector",
+ "org.apache.spark.mllib.linalg.DenseVector",
+ "org.apache.spark.mllib.linalg.SparseVector",
+ "org.apache.spark.mllib.linalg.Matrix",
+ "org.apache.spark.mllib.linalg.DenseMatrix",
+ "org.apache.spark.mllib.linalg.SparseMatrix",
+ "org.apache.spark.ml.linalg.Vector",
+ "org.apache.spark.ml.linalg.DenseVector",
+ "org.apache.spark.ml.linalg.SparseVector",
+ "org.apache.spark.ml.linalg.Matrix",
+ "org.apache.spark.ml.linalg.DenseMatrix",
+ "org.apache.spark.ml.linalg.SparseMatrix",
+ "org.apache.spark.ml.feature.Instance",
+ "org.apache.spark.ml.feature.OffsetInstance"
+ ).foreach { name =>
+ try {
+ val clazz = Utils.classForName(name)
+ kryo.register(clazz)
+ } catch {
+ case NonFatal(_) => // do nothing
+ }
+ }
+
kryo.setClassLoader(classLoader)
kryo
}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala
new file mode 100644
index 0000000..88c85a9
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.serializer.KryoSerializer
+
+class InstanceSuit extends SparkFunSuite{
+ test("Kryo class register") {
+ val conf = new SparkConf(false)
+ conf.set("spark.kryo.registrationRequired", "true")
+
+ val ser = new KryoSerializer(conf)
+ val serInstance = new KryoSerializer(conf).newInstance()
+
+ def check[T: ClassTag](t: T) {
+ assert(serInstance.deserialize[T](serInstance.serialize(t)) === t)
+ }
+
+ val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
+ val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
+ val oInstance1 = OffsetInstance(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0))
+ val oInstance2 = OffsetInstance(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0).toSparse)
+ check(instance1)
+ check(instance2)
+ check(oInstance1)
+ check(oInstance2)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
index c8ac92e..d76edb9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
@@ -20,16 +20,42 @@ package org.apache.spark.mllib.linalg
import java.util.Random
import scala.collection.mutable.{Map => MutableMap}
+import scala.reflect.ClassTag
import breeze.linalg.{CSCMatrix, Matrix => BM}
import org.mockito.Mockito.when
import org.scalatest.mockito.MockitoSugar._
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.serializer.KryoSerializer
class MatricesSuite extends SparkFunSuite {
+ test("kryo class register") {
+ val conf = new SparkConf(false)
+ conf.set("spark.kryo.registrationRequired", "true")
+
+ val ser = new KryoSerializer(conf).newInstance()
+
+ def check[T: ClassTag](t: T) {
+ assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
+
+ val m = 3
+ val n = 2
+ val denseValues = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
+ val denseMat = Matrices.dense(m, n, denseValues).asInstanceOf[DenseMatrix]
+
+ val sparseValues = Array(1.0, 2.0, 4.0, 5.0)
+ val colPtrs = Array(0, 2, 4)
+ val rowIndices = Array(1, 2, 1, 2)
+ val sparseMat =
+ Matrices.sparse(m, n, colPtrs, rowIndices, sparseValues).asInstanceOf[SparseMatrix]
+ check(denseMat)
+ check(sparseMat)
+ }
+
test("dense matrix construction") {
val m = 3
val n = 2
http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index a1e3ee5..4074bea 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -17,15 +17,17 @@
package org.apache.spark.mllib.linalg
+import scala.reflect.ClassTag
import scala.util.Random
import breeze.linalg.{squaredDistance => breezeSquaredDistance, DenseMatrix => BDM}
import org.json4s.jackson.JsonMethods.{parse => parseJson}
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
import org.apache.spark.internal.Logging
import org.apache.spark.ml.{linalg => newlinalg}
import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.serializer.KryoSerializer
class VectorsSuite extends SparkFunSuite with Logging {
@@ -34,6 +36,21 @@ class VectorsSuite extends SparkFunSuite with Logging {
val indices = Array(0, 2, 3)
val values = Array(0.1, 0.3, 0.4)
+ test("kryo class register") {
+ val conf = new SparkConf(false)
+ conf.set("spark.kryo.registrationRequired", "true")
+
+ val ser = new KryoSerializer(conf).newInstance()
+ def check[T: ClassTag](t: T) {
+ assert(ser.deserialize[T](ser.serialize(t)) === t)
+ }
+
+ val desVec = Vectors.dense(arr).asInstanceOf[DenseVector]
+ val sparVec = Vectors.sparse(n, indices, values).asInstanceOf[SparseVector]
+ check(desVec)
+ check(sparVec)
+ }
+
test("dense vector construction with varargs") {
val vec = Vectors.dense(arr).asInstanceOf[DenseVector]
assert(vec.size === arr.length)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org