You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/11/10 11:43:33 UTC

spark git commit: [SPARK-22450][CORE][MLLIB] safely register class for mllib

Repository: spark
Updated Branches:
  refs/heads/master 9b9827759 -> 1c923d7d6


[SPARK-22450][CORE][MLLIB] safely register class for mllib

## What changes were proposed in this pull request?

There are still some algorithms based on mllib, such as KMeans. For now, many mllib common class (such as: Vector, DenseVector, SparseVector, Matrix, DenseMatrix, SparseMatrix) are not registered in Kryo. So there are some performance issues for those object serialization or deserialization.
Previously dicussed: https://github.com/apache/spark/pull/19586

## How was this patch tested?

New test case.

Author: Xianyang Liu <xi...@intel.com>

Closes #19661 from ConeyLiu/register_vector.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/1c923d7d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/1c923d7d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/1c923d7d

Branch: refs/heads/master
Commit: 1c923d7d65dd94996f0fe2cf9851a1ae738c5c0c
Parents: 9b98277
Author: Xianyang Liu <xi...@intel.com>
Authored: Fri Nov 10 12:43:29 2017 +0100
Committer: Wenchen Fan <we...@databricks.com>
Committed: Fri Nov 10 12:43:29 2017 +0100

----------------------------------------------------------------------
 .../spark/serializer/KryoSerializer.scala       | 26 +++++++++++
 .../apache/spark/ml/feature/InstanceSuit.scala  | 47 ++++++++++++++++++++
 .../spark/mllib/linalg/MatricesSuite.scala      | 28 +++++++++++-
 .../spark/mllib/linalg/VectorsSuite.scala       | 19 +++++++-
 4 files changed, 118 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
index 58483c9..2259d1a 100644
--- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
+++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala
@@ -25,6 +25,7 @@ import javax.annotation.Nullable
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 import scala.reflect.ClassTag
+import scala.util.control.NonFatal
 
 import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
 import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
@@ -178,6 +179,31 @@ class KryoSerializer(conf: SparkConf)
     kryo.register(Utils.classForName("scala.collection.immutable.Map$EmptyMap$"))
     kryo.register(classOf[ArrayBuffer[Any]])
 
+    // We can't load those class directly in order to avoid unnecessary jar dependencies.
+    // We load them safely, ignore it if the class not found.
+    Seq("org.apache.spark.mllib.linalg.Vector",
+      "org.apache.spark.mllib.linalg.DenseVector",
+      "org.apache.spark.mllib.linalg.SparseVector",
+      "org.apache.spark.mllib.linalg.Matrix",
+      "org.apache.spark.mllib.linalg.DenseMatrix",
+      "org.apache.spark.mllib.linalg.SparseMatrix",
+      "org.apache.spark.ml.linalg.Vector",
+      "org.apache.spark.ml.linalg.DenseVector",
+      "org.apache.spark.ml.linalg.SparseVector",
+      "org.apache.spark.ml.linalg.Matrix",
+      "org.apache.spark.ml.linalg.DenseMatrix",
+      "org.apache.spark.ml.linalg.SparseMatrix",
+      "org.apache.spark.ml.feature.Instance",
+      "org.apache.spark.ml.feature.OffsetInstance"
+    ).foreach { name =>
+      try {
+        val clazz = Utils.classForName(name)
+        kryo.register(clazz)
+      } catch {
+        case NonFatal(_) => // do nothing
+      }
+    }
+
     kryo.setClassLoader(classLoader)
     kryo
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala
new file mode 100644
index 0000000..88c85a9
--- /dev/null
+++ b/mllib/src/test/scala/org/apache/spark/ml/feature/InstanceSuit.scala
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.ml.feature
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{SparkConf, SparkFunSuite}
+import org.apache.spark.ml.linalg.Vectors
+import org.apache.spark.serializer.KryoSerializer
+
+class InstanceSuit extends SparkFunSuite{
+  test("Kryo class register") {
+    val conf = new SparkConf(false)
+    conf.set("spark.kryo.registrationRequired", "true")
+
+    val ser = new KryoSerializer(conf)
+    val serInstance = new KryoSerializer(conf).newInstance()
+
+    def check[T: ClassTag](t: T) {
+      assert(serInstance.deserialize[T](serInstance.serialize(t)) === t)
+    }
+
+    val instance1 = Instance(19.0, 2.0, Vectors.dense(1.0, 7.0))
+    val instance2 = Instance(17.0, 1.0, Vectors.dense(0.0, 5.0).toSparse)
+    val oInstance1 = OffsetInstance(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0))
+    val oInstance2 = OffsetInstance(0.2, 1.0, 2.0, Vectors.dense(0.0, 5.0).toSparse)
+    check(instance1)
+    check(instance2)
+    check(oInstance1)
+    check(oInstance2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
index c8ac92e..d76edb9 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/MatricesSuite.scala
@@ -20,16 +20,42 @@ package org.apache.spark.mllib.linalg
 import java.util.Random
 
 import scala.collection.mutable.{Map => MutableMap}
+import scala.reflect.ClassTag
 
 import breeze.linalg.{CSCMatrix, Matrix => BM}
 import org.mockito.Mockito.when
 import org.scalatest.mockito.MockitoSugar._
 
-import org.apache.spark.SparkFunSuite
+import org.apache.spark.{SparkConf, SparkFunSuite}
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.serializer.KryoSerializer
 
 class MatricesSuite extends SparkFunSuite {
+  test("kryo class register") {
+    val conf = new SparkConf(false)
+    conf.set("spark.kryo.registrationRequired", "true")
+
+    val ser = new KryoSerializer(conf).newInstance()
+
+    def check[T: ClassTag](t: T) {
+      assert(ser.deserialize[T](ser.serialize(t)) === t)
+    }
+
+    val m = 3
+    val n = 2
+    val denseValues = Array(0.0, 1.0, 2.0, 3.0, 4.0, 5.0)
+    val denseMat = Matrices.dense(m, n, denseValues).asInstanceOf[DenseMatrix]
+
+    val sparseValues = Array(1.0, 2.0, 4.0, 5.0)
+    val colPtrs = Array(0, 2, 4)
+    val rowIndices = Array(1, 2, 1, 2)
+    val sparseMat =
+      Matrices.sparse(m, n, colPtrs, rowIndices, sparseValues).asInstanceOf[SparseMatrix]
+    check(denseMat)
+    check(sparseMat)
+  }
+
   test("dense matrix construction") {
     val m = 3
     val n = 2

http://git-wip-us.apache.org/repos/asf/spark/blob/1c923d7d/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
index a1e3ee5..4074bea 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/linalg/VectorsSuite.scala
@@ -17,15 +17,17 @@
 
 package org.apache.spark.mllib.linalg
 
+import scala.reflect.ClassTag
 import scala.util.Random
 
 import breeze.linalg.{squaredDistance => breezeSquaredDistance, DenseMatrix => BDM}
 import org.json4s.jackson.JsonMethods.{parse => parseJson}
 
-import org.apache.spark.{SparkException, SparkFunSuite}
+import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
 import org.apache.spark.internal.Logging
 import org.apache.spark.ml.{linalg => newlinalg}
 import org.apache.spark.mllib.util.TestingUtils._
+import org.apache.spark.serializer.KryoSerializer
 
 class VectorsSuite extends SparkFunSuite with Logging {
 
@@ -34,6 +36,21 @@ class VectorsSuite extends SparkFunSuite with Logging {
   val indices = Array(0, 2, 3)
   val values = Array(0.1, 0.3, 0.4)
 
+  test("kryo class register") {
+    val conf = new SparkConf(false)
+    conf.set("spark.kryo.registrationRequired", "true")
+
+    val ser = new KryoSerializer(conf).newInstance()
+    def check[T: ClassTag](t: T) {
+      assert(ser.deserialize[T](ser.serialize(t)) === t)
+    }
+
+    val desVec = Vectors.dense(arr).asInstanceOf[DenseVector]
+    val sparVec = Vectors.sparse(n, indices, values).asInstanceOf[SparseVector]
+    check(desVec)
+    check(sparVec)
+  }
+
   test("dense vector construction with varargs") {
     val vec = Vectors.dense(arr).asInstanceOf[DenseVector]
     assert(vec.size === arr.length)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org