You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by patrickbrownsync <gi...@git.apache.org> on 2018/10/26 19:09:42 UTC

[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

GitHub user patrickbrownsync opened a pull request:

    https://github.com/apache/spark/pull/22855

    [SPARK-25839] [Core] Implement use of KryoPool in KryoSerializer

    ## What changes were proposed in this pull request?
    
    * Implement (optional) use of KryoPool in KryoSerializer, an alternative to the existing implementation of caching a Kryo instance inside KryoSerializerInstance
    * Add config key & documentation of spark.kryo.pool in order to turn this on
    * Add benchmark KryoSerializerBenchmark to compare new and old implementation
    * Add results of benchmark
    
    ## How was this patch tested?
    
    Added new tests inside KryoSerializerSuite to test the pool implementation as well as added the pool option to the existing regression testing for SPARK-7766
    
    
    This is my original work and I license the work to the project under the project’s open source license.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/Blyncs/spark kryo-pool

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/22855.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #22855
    
----
commit ce5d13e0673f8574b5795d0d3df59da03118038a
Author: Patrick Brown <pa...@...>
Date:   2018-04-06T18:19:52Z

    use kryo pool for kryo serializer

commit a4ba88eed6d18d2df5fab609bc28210df9b5a716
Author: Patrick Brown <pa...@...>
Date:   2018-04-09T21:20:16Z

    fix pool serializable issue

commit 3f1c41ccc451af868a13616065676a5667d597fa
Author: Patrick Brown <pa...@...>
Date:   2018-10-26T18:52:07Z

    Add option to KryoSerializer to use new KryoPool based implentation
    Add tests for new implementation to KryoSerializerSuite.scala
    Add benchmark new vs old implemtation in KryoSerializerBenchmark.scala
    Add option in Benchmark base for afterAll() shutdown code to facilitate clean benchmark shutdown
    Add documentation for spark.kryo.pool configuration option

----


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4418 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4418/testReport)** for PR 22855 at commit [`3c00321`](https://github.com/apache/spark/commit/3c00321b6156b8673f5d81f859a3b092266f0181).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229763610
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
    @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     
       // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling
       // reference-tracking would lead to corrupted output when serializer instances are re-used
    -  for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
    -    test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") {
    -      testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)
    +  for {
    +    referenceTracking <- Set(true, false)
    --- End diff --
    
    Might as well fix while I'm here


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r231526925
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf)
         kryo
       }
     
    +  override def setDefaultClassLoader(classLoader: ClassLoader): Serializer = {
    +    defaultClassLoader = Some(classLoader)
    --- End diff --
    
    You can write `super.setDefaultClassLoader(classLoader)` here to inherit the behavior rather than duplicate it. It is just one line now, yes.
    
    Where is defaultClassLoader used in this implementation though? I wonder why it matters that you call `getPool` after this field is set. And if it does, isn't setting it in line 105 not actually helping?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r230420266
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
       private val avroSchemas = conf.getAvroSchema
       // whether to use unsafe based IO for serialization
       private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
    +  private val usePool = conf.getBoolean("spark.kryo.pool", false)
    --- End diff --
    
    Okay, if I'm understanding you correctly I should set the default to true, and remove the documentation. I will go ahead and do that, if I misunderstood let me know. Thanks


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by wangyum <gi...@git.apache.org>.
Github user wangyum commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r232203634
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.serializer
    +
    +import scala.concurrent._
    +import scala.concurrent.ExecutionContext.Implicits.global
    +import scala.concurrent.duration._
    +
    +import org.apache.spark.{SparkConf, SparkContext}
    +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
    +import org.apache.spark.serializer.KryoTest._
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Benchmark for KryoPool vs old "pool of 1".
    + * To run this benchmark:
    + * {{{
    + *   1. without sbt:
    + *      bin/spark-submit --class <this class> --jars <spark core test jar>
    + *   2. build/sbt "core/test:runMain <this class>"
    + *   3. generate result:
    + *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
    + *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
    + * }}}
    + */
    +object KryoSerializerBenchmark extends BenchmarkBase {
    --- End diff --
    
    cc @dongjoon-hyun for Benchmark change.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r232375758
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
         }
       }
     
    -  @transient
    -  var pool: KryoPool = getPool
    +  private class PoolWrapper extends KryoPool {
    +    private var pool: KryoPool = getPool
    +
    +    override def borrow(): Kryo = pool.borrow()
    +
    +    override def release(kryo: Kryo): Unit = pool.release(kryo)
    +
    +    override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback)
    +
    +    def reset(): Unit = {
    +      pool = getPool
    --- End diff --
    
    The KryoPool interface exposes no way to free it, so I believe that is not necessary


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r230421073
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
       private val avroSchemas = conf.getAvroSchema
       // whether to use unsafe based IO for serialization
       private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
    +  private val usePool = conf.getBoolean("spark.kryo.pool", false)
    --- End diff --
    
    Yeah I think that's a fine position to take, if we can't think of a reason to disable it other than the theoretical unknown unknown bug out there.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    @srowen Thanks for taking the time to give me feedback! I'll get this stuff fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r231213322
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
    @@ -431,9 +434,11 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
         ser.deserialize[HashMap[Int, List[String]]](serializedMap)
       }
     
    -  private def testSerializerInstanceReuse(autoReset: Boolean, referenceTracking: Boolean): Unit = {
    +  private def testSerializerInstanceReuse(
    +     autoReset: Boolean, referenceTracking: Boolean, usePool: Boolean): Unit = {
    --- End diff --
    
    Total nit: there are a couple continuation indents that are 3 rather than 4 spaces. Don't bother, unless you need to make other updates.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r231554649
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    +    override def create: Kryo = {
    +      newKryo()
    +    }
    +  }
    +
    +  @transient
    --- End diff --
    
    Yes, you are correct, I don't think this will work


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228716844
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
    @@ -33,6 +33,7 @@ import org.apache.spark.serializer.KryoTest._
     import org.apache.spark.storage.BlockManagerId
     import org.apache.spark.util.Utils
     
    +
    --- End diff --
    
    nit: extra empty line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228759425
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.serializer
    +
    +import scala.concurrent._
    +import scala.concurrent.ExecutionContext.Implicits.global
    +import scala.concurrent.duration._
    +
    +import org.apache.spark.{SparkConf, SparkContext}
    +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
    +import org.apache.spark.serializer.KryoTest._
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Benchmark for KryoPool vs old "pool of 1".
    + * To run this benchmark:
    + * {{{
    + *   1. without sbt:
    + *      bin/spark-submit --class <this class> --jars <spark core test jar>
    + *   2. build/sbt "core/test:runMain <this class>"
    + *   3. generate result:
    + *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
    + *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
    + * }}}
    + */
    +object KryoSerializerBenchmark extends BenchmarkBase {
    +
    +  var sc: SparkContext = null
    +  val N = 500
    +  override def runBenchmarkSuite(): Unit = {
    +    val name = "Benchmark KryoPool vs old\"pool of 1\" implementation"
    +    runBenchmark(name) {
    +      val benchmark = new Benchmark(name, N, 10, output = output)
    +      Seq(true, false).foreach(usePool => run(usePool, benchmark))
    +      benchmark.run()
    +    }
    +  }
    +
    +  private def run(usePool: Boolean, benchmark: Benchmark): Unit = {
    +    lazy val sc = createSparkContext(usePool)
    +
    +    benchmark.addCase(s"KryoPool:$usePool") { _ =>
    +      val futures = for (_ <- 0 until N) yield {
    +        Future {
    +          sc.parallelize(0 until 10).map(i => i + 1).count()
    +        }
    +      }
    +
    +      val future = Future.sequence(futures)
    +
    +      ThreadUtils.awaitResult(future, 10.minutes)
    +    }
    +  }
    +
    +  def createSparkContext(usePool: Boolean): SparkContext = {
    --- End diff --
    
    I'm not sure I understand the question here, this benchmark class doesn't inherit from `SqlBasedBenchmark` it inherits from `BenchmarkBase` which has no `getSparkSession` method.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    Ping @patrickbrownsync -- with this fixed I think we can merge it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4410 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4410/testReport)** for PR 22855 at commit [`3433622`](https://github.com/apache/spark/commit/3433622d92957607ca3a4f83f199f86f52077627).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229737307
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    +    override def create: Kryo = {
    +      newKryo()
    +    }
    +  }
    +
    +  @transient
    +  lazy val pool = new KryoPool.Builder(factory).softReferences.build
    --- End diff --
    
    `private`?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229738553
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
    @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     
       // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling
       // reference-tracking would lead to corrupted output when serializer instances are re-used
    -  for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
    -    test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") {
    -      testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)
    +  for {
    +    referenceTracking <- Set(true, false)
    --- End diff --
    
    Not that it matters, but I think this should have originally been Seq not Set


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228759466
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -298,30 +312,40 @@ class KryoDeserializationStream(
       }
     }
     
    -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
    +private[spark] class KryoSerializerInstance(
    +   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
       extends SerializerInstance {
       /**
        * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
        * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
        * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
        * not synchronized.
        */
    -  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
    +  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo()
     
       /**
        * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
        * otherwise, it allocates a new instance.
        */
       private[serializer] def borrowKryo(): Kryo = {
    -    if (cachedKryo != null) {
    -      val kryo = cachedKryo
    -      // As a defensive measure, call reset() to clear any Kryo state that might have been modified
    -      // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
    +    if (usePool) {
    +      val kryo = ks.pool.borrow()
           kryo.reset()
    -      cachedKryo = null
           kryo
         } else {
    -      ks.newKryo()
    +      if (cachedKryo != null) {
    +        val kryo = cachedKryo
    +        /**
    +        * As a defensive measure, call reset() to clear any Kryo state that might have
    --- End diff --
    
    Sorry I'm new to this, what is the specific style issue here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229762474
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    +    override def create: Kryo = {
    +      newKryo()
    +    }
    +  }
    +
    +  @transient
    +  lazy val pool = new KryoPool.Builder(factory).softReferences.build
    --- End diff --
    
    This is used directly by KryoSerializerInstance


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228970250
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -298,30 +312,40 @@ class KryoDeserializationStream(
       }
     }
     
    -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
    +private[spark] class KryoSerializerInstance(
    +   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
       extends SerializerInstance {
       /**
        * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
        * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
        * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
        * not synchronized.
        */
    -  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
    +  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo()
     
       /**
        * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
        * otherwise, it allocates a new instance.
        */
       private[serializer] def borrowKryo(): Kryo = {
    -    if (cachedKryo != null) {
    -      val kryo = cachedKryo
    -      // As a defensive measure, call reset() to clear any Kryo state that might have been modified
    -      // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
    +    if (usePool) {
    +      val kryo = ks.pool.borrow()
           kryo.reset()
    -      cachedKryo = null
           kryo
         } else {
    -      ks.newKryo()
    +      if (cachedKryo != null) {
    +        val kryo = cachedKryo
    +        /**
    +        * As a defensive measure, call reset() to clear any Kryo state that might have
    --- End diff --
    
    Thanks fixed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4415 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4415/testReport)** for PR 22855 at commit [`9069768`](https://github.com/apache/spark/commit/90697681ea39943b130977d7687757ce6a4db722).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r232401327
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
         }
       }
     
    -  @transient
    -  var pool: KryoPool = getPool
    +  private class PoolWrapper extends KryoPool {
    +    private var pool: KryoPool = getPool
    +
    +    override def borrow(): Kryo = pool.borrow()
    +
    +    override def release(kryo: Kryo): Unit = pool.release(kryo)
    +
    +    override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback)
    +
    +    def reset(): Unit = {
    +      pool = getPool
    --- End diff --
    
    OK, and it only gets recreated when a new classloader is set, it seems, so it is rare. 


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r232330079
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    --- End diff --
    
    This was needed


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4417 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4417/testReport)** for PR 22855 at commit [`60310c0`](https://github.com/apache/spark/commit/60310c0e18613f0c32f19b73e6ac25a49ba25e86).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229763892
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
       private val avroSchemas = conf.getAvroSchema
       // whether to use unsafe based IO for serialization
       private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
    +  private val usePool = conf.getBoolean("spark.kryo.pool", false)
    --- End diff --
    
    I  had already added this to docs/configuration.md should I remove it?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r230422530
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
       private val avroSchemas = conf.getAvroSchema
       // whether to use unsafe based IO for serialization
       private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
    +  private val usePool = conf.getBoolean("spark.kryo.pool", false)
    --- End diff --
    
    Sounds good and done


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4415 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4415/testReport)** for PR 22855 at commit [`9069768`](https://github.com/apache/spark/commit/90697681ea39943b130977d7687757ce6a4db722).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/22855


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229737692
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    --- End diff --
    
    I think you're welcome to just write ...
    ```
    private lazy val factory = new KryoFactory() {
      override def create: Kryo = newKryo()
    }
    ```
    but it doesn't matter.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228716824
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala ---
    @@ -0,0 +1,90 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.serializer
    +
    +import scala.concurrent._
    +import scala.concurrent.ExecutionContext.Implicits.global
    +import scala.concurrent.duration._
    +
    +import org.apache.spark.{SparkConf, SparkContext}
    +import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
    +import org.apache.spark.serializer.KryoTest._
    +import org.apache.spark.util.ThreadUtils
    +
    +/**
    + * Benchmark for KryoPool vs old "pool of 1".
    + * To run this benchmark:
    + * {{{
    + *   1. without sbt:
    + *      bin/spark-submit --class <this class> --jars <spark core test jar>
    + *   2. build/sbt "core/test:runMain <this class>"
    + *   3. generate result:
    + *      SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this class>"
    + *      Results will be written to "benchmarks/KryoSerializerBenchmark-results.txt".
    + * }}}
    + */
    +object KryoSerializerBenchmark extends BenchmarkBase {
    +
    +  var sc: SparkContext = null
    +  val N = 500
    +  override def runBenchmarkSuite(): Unit = {
    +    val name = "Benchmark KryoPool vs old\"pool of 1\" implementation"
    +    runBenchmark(name) {
    +      val benchmark = new Benchmark(name, N, 10, output = output)
    +      Seq(true, false).foreach(usePool => run(usePool, benchmark))
    +      benchmark.run()
    +    }
    +  }
    +
    +  private def run(usePool: Boolean, benchmark: Benchmark): Unit = {
    +    lazy val sc = createSparkContext(usePool)
    +
    +    benchmark.addCase(s"KryoPool:$usePool") { _ =>
    +      val futures = for (_ <- 0 until N) yield {
    +        Future {
    +          sc.parallelize(0 until 10).map(i => i + 1).count()
    +        }
    +      }
    +
    +      val future = Future.sequence(futures)
    +
    +      ThreadUtils.awaitResult(future, 10.minutes)
    +    }
    +  }
    +
    +  def createSparkContext(usePool: Boolean): SparkContext = {
    --- End diff --
    
    We add this but not override `getSparkSession` in `SqlBasedBenchmark`, is it because change conf in SparkSession doesn't work for SparkSession?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4410 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4410/testReport)** for PR 22855 at commit [`3433622`](https://github.com/apache/spark/commit/3433622d92957607ca3a4f83f199f86f52077627).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229738794
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -30,6 +30,7 @@ import scala.util.control.NonFatal
     import com.esotericsoftware.kryo.{Kryo, KryoException, Serializer => KryoClassSerializer}
     import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput}
     import com.esotericsoftware.kryo.io.{UnsafeInput => KryoUnsafeInput, UnsafeOutput => KryoUnsafeOutput}
    +import com.esotericsoftware.kryo.pool._
    --- End diff --
    
    I'd spell out the imports for clarity unless it's going to run more than 2 lines or something


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229739685
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
    @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     
       // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling
       // reference-tracking would lead to corrupted output when serializer instances are re-used
    -  for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
    -    test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") {
    -      testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)
    +  for {
    +    referenceTracking <- Set(true, false)
    +    autoReset <- Set(true, false)
    +    usePool <- Set(true, false)
    +  } {
    +    test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking" +
    +      s", usePool = $usePool") {
    +      testSerializerInstanceReuse(
    +        autoReset = autoReset, referenceTracking = referenceTracking, usePool = usePool)
    --- End diff --
    
    Likewise i see this is just how the code was written before but the `foo = foo` style isn't adding anything IMHO. Feel free to not name args


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    @srowen should be fixed now, I rebased off master and changed KryoSerializerBenchmark to conform to how BenchmarkBase had changed on master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r232332531
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -101,13 +101,29 @@ class KryoSerializer(conf: SparkConf)
         }
       }
     
    -  @transient
    -  var pool: KryoPool = getPool
    +  private class PoolWrapper extends KryoPool {
    +    private var pool: KryoPool = getPool
    +
    +    override def borrow(): Kryo = pool.borrow()
    +
    +    override def release(kryo: Kryo): Unit = pool.release(kryo)
    +
    +    override def run[T](kryoCallback: KryoCallback[T]): T = pool.run(kryoCallback)
    +
    +    def reset(): Unit = {
    +      pool = getPool
    --- End diff --
    
    OK. We could look at updating to Kryo 5 in Spark 3, too, if there were other benefits.
    Does the old pool need to get freed or anything here?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4417 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4417/testReport)** for PR 22855 at commit [`60310c0`](https://github.com/apache/spark/commit/60310c0e18613f0c32f19b73e6ac25a49ba25e86).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228716995
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -298,30 +312,40 @@ class KryoDeserializationStream(
       }
     }
     
    -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
    +private[spark] class KryoSerializerInstance(
    +   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
       extends SerializerInstance {
       /**
        * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
        * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
        * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
        * not synchronized.
        */
    -  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
    +  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo()
     
       /**
        * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
        * otherwise, it allocates a new instance.
        */
       private[serializer] def borrowKryo(): Kryo = {
    -    if (cachedKryo != null) {
    -      val kryo = cachedKryo
    -      // As a defensive measure, call reset() to clear any Kryo state that might have been modified
    -      // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
    +    if (usePool) {
    +      val kryo = ks.pool.borrow()
           kryo.reset()
    -      cachedKryo = null
           kryo
         } else {
    -      ks.newKryo()
    +      if (cachedKryo != null) {
    +        val kryo = cachedKryo
    +        /**
    +        * As a defensive measure, call reset() to clear any Kryo state that might have
    --- End diff --
    
    nit for the style.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229740163
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
       private val avroSchemas = conf.getAvroSchema
       // whether to use unsafe based IO for serialization
       private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
    +  private val usePool = conf.getBoolean("spark.kryo.pool", false)
    --- End diff --
    
    Yep, I'd leave this undocumented


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r230052150
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -84,6 +85,7 @@ class KryoSerializer(conf: SparkConf)
       private val avroSchemas = conf.getAvroSchema
       // whether to use unsafe based IO for serialization
       private val useUnsafe = conf.getBoolean("spark.kryo.unsafe", false)
    +  private val usePool = conf.getBoolean("spark.kryo.pool", false)
    --- End diff --
    
    I would not document it. This is just a safety valve. In theory, there's no reason to disable this nor would a caller know why to disable it.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r232330015
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,36 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    +    override def create: Kryo = {
    +      newKryo()
    +    }
    +  }
    +
    +  private class PoolWrapper extends KryoPool {
    --- End diff --
    
    This wrapper can be removed when Kryo is updated to 5.0 as the new pool implementation exposes the needed method to clear the pool


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4421 has finished](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4421/testReport)** for PR 22855 at commit [`3bfc4eb`](https://github.com/apache/spark/commit/3bfc4ebbf214b6b0fadbaa10aa832303a59de97d).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r231213606
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -298,30 +309,40 @@ class KryoDeserializationStream(
       }
     }
     
    -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
    +private[spark] class KryoSerializerInstance(
    +   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
       extends SerializerInstance {
       /**
        * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
        * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
        * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
        * not synchronized.
        */
    -  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
    +  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo()
     
       /**
        * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
        * otherwise, it allocates a new instance.
        */
       private[serializer] def borrowKryo(): Kryo = {
    -    if (cachedKryo != null) {
    -      val kryo = cachedKryo
    -      // As a defensive measure, call reset() to clear any Kryo state that might have been modified
    -      // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
    +    if (usePool) {
    +      val kryo = ks.pool.borrow()
           kryo.reset()
    -      cachedKryo = null
           kryo
         } else {
    -      ks.newKryo()
    +      if (cachedKryo != null) {
    +        val kryo = cachedKryo
    +        /**
    +         * As a defensive measure, call reset() to clear any Kryo state that might have
    --- End diff --
    
    Another total nit, not worth touching unless you make other changes, but this is scaladoc-style. Multi-line comments are often just commented with `//` as before.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by xuanyuanking <gi...@git.apache.org>.
Github user xuanyuanking commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228716497
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,18 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +
    --- End diff --
    
    nit: extra empty line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    Can one of the admins verify this patch?


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229738657
  
    --- Diff: core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---
    @@ -456,9 +458,63 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext {
     
       // Regression test for SPARK-7766, an issue where disabling auto-reset and enabling
       // reference-tracking would lead to corrupted output when serializer instances are re-used
    -  for (referenceTracking <- Set(true, false); autoReset <- Set(true, false)) {
    -    test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking") {
    -      testSerializerInstanceReuse(autoReset = autoReset, referenceTracking = referenceTracking)
    +  for {
    +    referenceTracking <- Set(true, false)
    +    autoReset <- Set(true, false)
    +    usePool <- Set(true, false)
    +  } {
    +    test(s"instance reuse with autoReset = $autoReset, referenceTracking = $referenceTracking" +
    +      s", usePool = $usePool") {
    +      testSerializerInstanceReuse(
    +        autoReset = autoReset, referenceTracking = referenceTracking, usePool = usePool)
    +    }
    +  }
    +
    +  test("SPARK-25839 KryoPool implementation works correctly in multi-threaded environment") {
    +    import java.util.concurrent.Executors
    --- End diff --
    
    I'd import at the top of the file.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r229762873
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,16 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    --- End diff --
    
    Thanks, I'll remove, I had a problem previously but it looks like the @transient on the pool is enough itself.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    @srowen I went ahead and fixed those issues where I could find them, mostly for my own learning experience. Thanks for taking the time to review this many times, really appreciate it


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r231526283
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -92,6 +94,20 @@ class KryoSerializer(conf: SparkConf)
           new KryoOutput(bufferSize, math.max(bufferSize, maxBufferSize))
         }
     
    +  @transient
    +  private lazy val factory: KryoFactory = new KryoFactory() {
    +    override def create: Kryo = {
    +      newKryo()
    +    }
    +  }
    +
    +  @transient
    --- End diff --
    
    Back on this item now, if this is no longer a lazy val, and it's transient, how does it get set again after this object itself is serialized? I'm mostly wondering if this is required now.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    Merged to master


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    **[Test build #4421 has started](https://amplab.cs.berkeley.edu/jenkins/job/NewSparkPullRequestBuilder/4421/testReport)** for PR 22855 at commit [`3bfc4eb`](https://github.com/apache/spark/commit/3bfc4ebbf214b6b0fadbaa10aa832303a59de97d).


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by patrickbrownsync <gi...@git.apache.org>.
Github user patrickbrownsync commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r231558355
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -214,8 +230,14 @@ class KryoSerializer(conf: SparkConf)
         kryo
       }
     
    +  override def setDefaultClassLoader(classLoader: ClassLoader): Serializer = {
    +    defaultClassLoader = Some(classLoader)
    --- End diff --
    
    defaultClassLoader is used in newKryo.
    
    I called `getPool` after setting the defaultClassLoader to make sure we don't accidently create a newKryo before the defaultClassLoader is updated. Setting it on 105 was because I don't believe `setDefaultClassLoader` is required to be called.
    
    The issue that I unfortunately didn't notice until these tests failed is that the tests specify that you can `setDefaultClassLoader` after serializing an object (I'm not sure this functionality is actually used), causing an "incorrect" Kryo instance to be in the pool. Unfortunately the pool doesn't expose a way to clear itself out, thus the var, which clearly doesn't work.
    
    I will work on a solution


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #22855: [SPARK-25839] [Core] Implement use of KryoPool in KryoSe...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the issue:

    https://github.com/apache/spark/pull/22855
  
    Oops, something to do with the benchmark class:
    
    ```
    [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala:41: object creation impossible, since method runBenchmarkSuite in class BenchmarkBase of type (mainArgs: Array[String])Unit is not defined
    [error] object KryoSerializerBenchmark extends BenchmarkBase {
    [error]        ^
    [error] /home/jenkins/workspace/NewSparkPullRequestBuilder/core/src/test/scala/org/apache/spark/serializer/KryoSerializerBenchmark.scala:45: method runBenchmarkSuite overrides nothing.
    [error] Note: the super classes of object KryoSerializerBenchmark contain the following, non final members named runBenchmarkSuite:
    [error] def runBenchmarkSuite(mainArgs: Array[String]): Unit
    [error]   override def runBenchmarkSuite(): Unit = {
    [error]                ^
    ```


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #22855: [SPARK-25839] [Core] Implement use of KryoPool in...

Posted by 10110346 <gi...@git.apache.org>.
Github user 10110346 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/22855#discussion_r228844982
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---
    @@ -298,30 +312,40 @@ class KryoDeserializationStream(
       }
     }
     
    -private[spark] class KryoSerializerInstance(ks: KryoSerializer, useUnsafe: Boolean)
    +private[spark] class KryoSerializerInstance(
    +   ks: KryoSerializer, useUnsafe: Boolean, usePool: Boolean)
       extends SerializerInstance {
       /**
        * A re-used [[Kryo]] instance. Methods will borrow this instance by calling `borrowKryo()`, do
        * their work, then release the instance by calling `releaseKryo()`. Logically, this is a caching
        * pool of size one. SerializerInstances are not thread-safe, hence accesses to this field are
        * not synchronized.
        */
    -  @Nullable private[this] var cachedKryo: Kryo = borrowKryo()
    +  @Nullable private[this] var cachedKryo: Kryo = if (usePool) null else borrowKryo()
     
       /**
        * Borrows a [[Kryo]] instance. If possible, this tries to re-use a cached Kryo instance;
        * otherwise, it allocates a new instance.
        */
       private[serializer] def borrowKryo(): Kryo = {
    -    if (cachedKryo != null) {
    -      val kryo = cachedKryo
    -      // As a defensive measure, call reset() to clear any Kryo state that might have been modified
    -      // by the last operation to borrow this instance (see SPARK-7766 for discussion of this issue)
    +    if (usePool) {
    +      val kryo = ks.pool.borrow()
           kryo.reset()
    -      cachedKryo = null
           kryo
         } else {
    -      ks.newKryo()
    +      if (cachedKryo != null) {
    +        val kryo = cachedKryo
    +        /**
    +        * As a defensive measure, call reset() to clear any Kryo state that might have
    --- End diff --
    
    The `*`after the first line must be aligned with the first `*` of the first line.


---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org