You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by aramesh117 <gi...@git.apache.org> on 2017/02/22 05:24:50 UTC

[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

GitHub user aramesh117 opened a pull request:

    https://github.com/apache/spark/pull/17024

    [SPARK-19525][CORE] Compressing checkpoints.

    Spark's performance improves greatly if we enable compression of
    checkpoints.
    
    ## What changes were proposed in this pull request?
    
    - Compress each partition before writing to persistent file system.
    - Decompress each partition before reading from persistent file system.
    - Default behavior should be to not compress.
    - Add logging for checkpoint durations for A/B testing with and without compression enabled.
    
    ## How was this patch tested?
    
    This was tested using existing unit tests for backwards compatibility and with new tests for this functionality. It has also been used in our production system for almost a year.
    
    Please review http://spark.apache.org/contributing.html before opening a pull request.


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/aramesh117/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17024.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17024
    
----
commit 7837b0c6052fa20bd1a6cf823947e95379d6d3b8
Author: Aaditya Ramesh <ar...@conviva.com>
Date:   2017-02-22T05:05:48Z

    [SPARK-19525][CORE] Compressing checkpoints.
    
    Spark's performance improves greatly if we enable compression of
    checkpoints.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @aramesh117 Unfortunately, since this heavily affects streaming, I cannot sign off on it without someone more familiar with spark streaming reviews it as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by aramesh117 <gi...@git.apache.org>.
Github user aramesh117 commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @mridulm I've added a new commit. Thank you for the review! :)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102420463
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -169,14 +174,24 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
     
         val fileOutputStream = if (blockSize < 0) {
    -      fs.create(tempOutputPath, false, bufferSize)
    +      val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none")
    +      val fileStream = fs.create(tempOutputPath, false, bufferSize)
    +      if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) {
    +        val compressionCodec = CompressionCodec.createCodec(env.conf, checkpointCodec)
    +        logInfo(s"Compressing using $checkpointCodec.")
    +        compressionCodec.compressedOutputStream(fileStream)
    +      } else {
    +        fileStream
    --- End diff --
    
    This repeated pattern can be rewritten as indicated above https://github.com/apache/spark/pull/17024/files#r102418860


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111833659
  
    --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
    @@ -238,6 +241,42 @@ trait RDDCheckpointTester { self: SparkFunSuite =>
       protected def generateFatPairRDD(): RDD[(Int, Int)] = {
         new FatPairRDD(sparkContext.makeRDD(1 to 100, 4), partitioner).mapValues(x => x)
       }
    +
    +  protected def testBasicCheckpoint(sc: SparkContext, reliableCheckpoint: Boolean): Unit = {
    --- End diff --
    
    nit: does this one test any special logic? If it's covered by other tests, not need to add it to increase the test time.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102418860
  
    --- Diff: core/src/main/scala/org/apache/spark/io/CompressionCodec.scala ---
    @@ -95,6 +95,7 @@ private[spark] object CompressionCodec {
       val FALLBACK_COMPRESSION_CODEC = "snappy"
       val DEFAULT_COMPRESSION_CODEC = "lz4"
       val ALL_COMPRESSION_CODECS = shortCompressionCodecNames.values.toSeq
    +  val ALL_COMPRESSION_CODECS_SHORT: Set[String] = shortCompressionCodecNames.keySet
    --- End diff --
    
    Instead of exposing this and supporting only short codec names for checkpoint, the pattern should be same as in rest of spark code when dealing with codec's.
    ```
    sparkConf.getOption("spark.checkpoint.compress.codec").map(c => 
      logInfo(s"Compressing checkpoint using $c.")
      CompressionCodec.createCodec(conf, c)
    ).getOrElse(fileStream)
    
    ```
    This will ensure that support for checkpoint compression is in line with rest of spark (short and long classes, no need to introduce 'none')
    
    Note: you will need to change fileStream to a `lazy val` - so that if codec creation throws exception, we dont leave dangling streams around (with limited block visibility scope to fileStream)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by aramesh117 <gi...@git.apache.org>.
Github user aramesh117 commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @mridulm Sure I can add in a file extension based on the codec being used. But is there a specific use case that adding an extension would solve?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111829095
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -27,8 +27,11 @@ import org.apache.hadoop.fs.Path
     import org.apache.spark._
     import org.apache.spark.broadcast.Broadcast
     import org.apache.spark.internal.Logging
    +import org.apache.spark.io.CompressionCodec
     import org.apache.spark.util.{SerializableConfiguration, Utils}
     
    +
    --- End diff --
    
    nit: please remove unnecessary space changes


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by aramesh117 <gi...@git.apache.org>.
Github user aramesh117 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102592127
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -169,14 +174,24 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
     
         val fileOutputStream = if (blockSize < 0) {
    -      fs.create(tempOutputPath, false, bufferSize)
    +      val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none")
    +      val fileStream = fs.create(tempOutputPath, false, bufferSize)
    +      if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) {
    +        val compressionCodec = CompressionCodec.createCodec(env.conf, checkpointCodec)
    +        logInfo(s"Compressing using $checkpointCodec.")
    +        compressionCodec.compressedOutputStream(fileStream)
    +      } else {
    +        fileStream
    +      }
         } else {
           // This is mainly for testing purpose
           fs.create(tempOutputPath, false, bufferSize,
             fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
         }
         val serializer = env.serializer.newInstance()
         val serializeStream = serializer.serializeStream(fileOutputStream)
    +    logInfo(s"Starting to write to checkpoint file $tempOutputPath.")
    --- End diff --
    
    My thought was that since checkpointing shouldn't be done too frequently anyway, this won't make the logs too verbose in the executor, and may be helpful for debugging after issues with checkpointing have already occurred. I'll make it logDebug for now, is this okay?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111830351
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -169,14 +177,23 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
     
         val fileOutputStream = if (blockSize < 0) {
    -      fs.create(tempOutputPath, false, bufferSize)
    +      lazy val fileStream: OutputStream = fs.create(tempOutputPath, false, bufferSize)
    +      env.conf.getOption("spark.checkpoint.compress.codec").fold(fileStream) {
    +        codec => {
    +          logDebug(s"Compressing using $codec.")
    +          CompressionCodec.createCodec(env.conf, codec)
    +            .compressedOutputStream(fileStream)
    +        }
    +      }
         } else {
           // This is mainly for testing purpose
           fs.create(tempOutputPath, false, bufferSize,
             fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
         }
         val serializer = env.serializer.newInstance()
         val serializeStream = serializer.serializeStream(fileOutputStream)
    +    logTrace(s"Starting to write to checkpoint file $tempOutputPath.")
    +    val startTimeMs = System.currentTimeMillis()
    --- End diff --
    
    same as above


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @aramesh117 I just opened #17789 to finish the rest work. All credits will go to you when merging the new PR.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17024


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by aramesh117 <gi...@git.apache.org>.
Github user aramesh117 commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @mridulm Thank you so much! I will definitely update with your suggestions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    I wonder if adding an extension (to the file) helps based on codec ...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111830401
  
    --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
    @@ -21,12 +21,15 @@ import java.io.File
     
     import scala.reflect.ClassTag
     
    +import com.google.common.io.ByteStreams
     import org.apache.hadoop.fs.Path
     
    +import org.apache.spark.io.CompressionCodec
     import org.apache.spark.rdd._
     import org.apache.spark.storage.{BlockId, StorageLevel, TestBlockId}
     import org.apache.spark.util.Utils
     
    +
    --- End diff --
    
    nit: please move unnecessary changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @aramesh117 looks good !
    I would also like someone working on streaming to chime in - since that is a common usecase for checkpoint.
    
    +CC @tdas, @zsxwing


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102420285
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -197,6 +212,7 @@ private[spark] object ReliableCheckpointRDD extends Logging {
             }
           }
         }
    +    logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTimeMs} ms.")
    --- End diff --
    
    Add codec (if used) here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102419998
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -133,9 +134,13 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val broadcastedConf = sc.broadcast(
           new SerializableConfiguration(sc.hadoopConfiguration))
         // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    +    logInfo(s"The checkpoint compression codec is " +
    --- End diff --
    
    This should be logged if compression is enabled (none is not a supported compression codec).
    It could also be rolled into the timing info log message below.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111830180
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -133,9 +136,14 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val broadcastedConf = sc.broadcast(
           new SerializableConfiguration(sc.hadoopConfiguration))
         // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    +    val startTime = System.currentTimeMillis()
    --- End diff --
    
    nit: please use `nanoTime` to measure the duration. See https://github.com/databricks/scala-style-guide/tree/f6cce9ab32e7b288638f2f1615d20a3b6d16ef2e#misc_currentTimeMillis_vs_nanoTime


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111832087
  
    --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
    @@ -266,13 +309,44 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
       override def sparkContext: SparkContext = sc
     
       runTest("basic checkpointing") { reliableCheckpoint: Boolean =>
    -    val parCollection = sc.makeRDD(1 to 4)
    -    val flatMappedRDD = parCollection.flatMap(x => 1 to x)
    -    checkpoint(flatMappedRDD, reliableCheckpoint)
    -    assert(flatMappedRDD.dependencies.head.rdd === parCollection)
    -    val result = flatMappedRDD.collect()
    -    assert(flatMappedRDD.dependencies.head.rdd != parCollection)
    -    assert(flatMappedRDD.collect() === result)
    +    startSparkContext()
    +    testBasicCheckpoint(sc, reliableCheckpoint)
    +  }
    +
    +  runTest("compression with snappy", skipLocalCheckpoint = true) { _: Boolean =>
    --- End diff --
    
    After you change the config to `spark.checkpoint.compress`, you don't need to test all compression codecs. Just write one test for the default codec. Others should be covered in `CompressionCodecSuite`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102420521
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -273,9 +289,16 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val env = SparkEnv.get
         val fs = path.getFileSystem(broadcastedConf.value.value)
         val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
    -    val fileInputStream = fs.open(path, bufferSize)
    +    val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none")
    +    val fileStream = fs.open(path, bufferSize)
    +    val inputStream =
    +      if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) {
    +        CompressionCodec.createCodec(env.conf, checkpointCodec).compressedInputStream(fileStream)
    +      } else {
    +        fileStream
    +      }
    --- End diff --
    
    Use https://github.com/apache/spark/pull/17024/files#r102418860


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    It makes it possible to identify what the data within the file is (compressed or not) - for user's perusal (it does not change anything for the application, that is true).
    But before you change code, I would like comments from streaming developers on this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102607469
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -169,14 +174,24 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
     
         val fileOutputStream = if (blockSize < 0) {
    -      fs.create(tempOutputPath, false, bufferSize)
    +      val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none")
    +      val fileStream = fs.create(tempOutputPath, false, bufferSize)
    +      if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) {
    +        val compressionCodec = CompressionCodec.createCodec(env.conf, checkpointCodec)
    +        logInfo(s"Compressing using $checkpointCodec.")
    +        compressionCodec.compressedOutputStream(fileStream)
    +      } else {
    +        fileStream
    +      }
         } else {
           // This is mainly for testing purpose
           fs.create(tempOutputPath, false, bufferSize,
             fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
         }
         val serializer = env.serializer.newInstance()
         val serializeStream = serializer.serializeStream(fileOutputStream)
    +    logInfo(s"Starting to write to checkpoint file $tempOutputPath.")
    --- End diff --
    
    logDebug should be fine too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111829941
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -133,9 +136,14 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val broadcastedConf = sc.broadcast(
           new SerializableConfiguration(sc.hadoopConfiguration))
         // TODO: This is expensive because it computes the RDD again unnecessarily (SPARK-8582)
    +    val startTime = System.currentTimeMillis()
         sc.runJob(originalRDD,
           writePartitionToCheckpointFile[T](checkpointDirPath.toString, broadcastedConf) _)
     
    +    logInfo(s"Checkpointing took ${System.currentTimeMillis() - startTime} ms.")
    +    sc.conf.getOption("spark.checkpoint.compress.codec").foreach(codec => {
    --- End diff --
    
    For consistency, I suggest we just add a new config `spark.checkpoint.compress` which means whether to enable checkpoint compression. See https://github.com/apache/spark/blob/b56ad2b1ec19fd60fa9d4926d12244fd3f56aca4/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L74 for example.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by aramesh117 <gi...@git.apache.org>.
Github user aramesh117 commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @mridulm Waiting for @tdas and @zsxwing has taken more than a month now. Is there any other way we can reach them or is there anyone else that can take a look at this merge request? This is a critical change that is needed for Conviva's use case if we are to upgrade to later versions of Spark.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111836700
  
    --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
    @@ -266,13 +309,44 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
       override def sparkContext: SparkContext = sc
     
       runTest("basic checkpointing") { reliableCheckpoint: Boolean =>
    -    val parCollection = sc.makeRDD(1 to 4)
    -    val flatMappedRDD = parCollection.flatMap(x => 1 to x)
    -    checkpoint(flatMappedRDD, reliableCheckpoint)
    -    assert(flatMappedRDD.dependencies.head.rdd === parCollection)
    -    val result = flatMappedRDD.collect()
    -    assert(flatMappedRDD.dependencies.head.rdd != parCollection)
    -    assert(flatMappedRDD.collect() === result)
    +    startSparkContext()
    +    testBasicCheckpoint(sc, reliableCheckpoint)
    +  }
    +
    +  runTest("compression with snappy", skipLocalCheckpoint = true) { _: Boolean =>
    --- End diff --
    
    For the new test, I think we just need one simple test. And if we put it into a new suite (e.g., the below example), then we don't need to touch the existing codes.
    ```
    class CheckpointCompressionSuite extends SparkFunSuite with LocalSparkContext {
    
      test("checkpoint compression") {
        val checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
        try {
          val conf = new SparkConf().set("spark.checkpoint.compress", "true")
          sc = new SparkContext("local", "test", conf)
          sc.setCheckpointDir(checkpointDir.toString)
          val rdd = sc.makeRDD(1 to 20, numSlices = 1)
          rdd.checkpoint()
          assert(rdd.collect().toSeq === (1 to 20))
          val checkpointPath = new Path(rdd.getCheckpointFile.get)
          val fs = checkpointPath.getFileSystem(sc.hadoopConfiguration)
          val checkpointFile =
            fs.listStatus(checkpointPath).map(_.getPath).find(_.getName.startsWith("part-")).get
    
          // Verify the checkpoint file can be decompressed
          val compressedInputStream = CompressionCodec.createCodec(conf)
            .compressedInputStream(fs.open(checkpointFile))
          ByteStreams.toByteArray(compressedInputStream)
    
          // Verify that the compressed content can be read back
          assert(rdd.collect().toSeq === (1 to 20))
        } finally {
          Utils.deleteRecursively(checkpointDir)
        }
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    +CC @tdas



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r111830443
  
    --- Diff: core/src/test/scala/org/apache/spark/CheckpointSuite.scala ---
    @@ -251,10 +290,14 @@ class CheckpointSuite extends SparkFunSuite with RDDCheckpointTester with LocalS
         super.beforeEach()
         checkpointDir = File.createTempFile("temp", "", Utils.createTempDir())
         checkpointDir.delete()
    +  }
    +
    +  private def startSparkContext(): Unit = {
         sc = new SparkContext("local", "test")
         sc.setCheckpointDir(checkpointDir.toString)
       }
     
    +
    --- End diff --
    
    nit: please move unnecessary changes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17024#discussion_r102420200
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/ReliableCheckpointRDD.scala ---
    @@ -169,14 +174,24 @@ private[spark] object ReliableCheckpointRDD extends Logging {
         val bufferSize = env.conf.getInt("spark.buffer.size", 65536)
     
         val fileOutputStream = if (blockSize < 0) {
    -      fs.create(tempOutputPath, false, bufferSize)
    +      val checkpointCodec = env.conf.get("spark.checkpoint.compress.codec", "none")
    +      val fileStream = fs.create(tempOutputPath, false, bufferSize)
    +      if (CompressionCodec.ALL_COMPRESSION_CODECS_SHORT.contains(checkpointCodec)) {
    +        val compressionCodec = CompressionCodec.createCodec(env.conf, checkpointCodec)
    +        logInfo(s"Compressing using $checkpointCodec.")
    +        compressionCodec.compressedOutputStream(fileStream)
    +      } else {
    +        fileStream
    +      }
         } else {
           // This is mainly for testing purpose
           fs.create(tempOutputPath, false, bufferSize,
             fs.getDefaultReplication(fs.getWorkingDirectory), blockSize)
         }
         val serializer = env.serializer.newInstance()
         val serializeStream = serializer.serializeStream(fileOutputStream)
    +    logInfo(s"Starting to write to checkpoint file $tempOutputPath.")
    --- End diff --
    
    This will make the logs verbose.
    If it does help with debugging, you could make it logTrace - or remove it entirely.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17024: [SPARK-19525][CORE] Compressing checkpoints.

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on the issue:

    https://github.com/apache/spark/pull/17024
  
    @aramesh117 do you have time to work on this PR recently? We need to merge this PR ASAP in order to get it into 2.2.0.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org