You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by vanzin <gi...@git.apache.org> on 2017/03/14 21:39:57 UTC

[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

GitHub user vanzin opened a pull request:

    https://github.com/apache/spark/pull/17295

    [SPARK-19556][core] Do not encrypt block manager data in memory.

    This change modifies the way block data is encrypted to make the more
    common cases faster, while penalizing an edge case. As a side effect
    of the change, all data that goes through the block manager is now
    encrypted only when needed, including the previous path (broadcast
    variables) where that did not happen.
    
    The way the change works is by not encrypting data that is stored in
    memory; so if a serialized block is in memory, it will only be encrypted
    once it is evicted to disk.
    
    The penalty comes when transferring that encrypted data from disk. If the
    data ends up in memory again, it is as efficient as before; but if the
    evicted block needs to be transferred directly to a remote executor, then
    there's now a performance penalty, since the code now uses a custom
    FileRegion implementation to decrypt the data before transferring.
    
    This also means that block data transferred between executors now is
    not encrypted (and thus relies on the network library encryption support
    for secrecy). Shuffle blocks are still transferred in encrypted form,
    since they're handled in a slightly different way by the code. This also
    keeps compatibility with existing external shuffle services, which transfer
    encrypted shuffle blocks, and avoids having to make the external service
    aware of encryption at all.
    
    Another change in the disk store is that it now stores a tiny metadata
    file next to the file holding the block data; this is needed to accurately
    account for the decrypted block size, which may be significantly different
    from the size of the encrypted file on disk.
    
    The serialization and deserialization APIs in the SerializerManager now
    do not do encryption automatically; callers need to explicitly wrap their
    streams with an appropriate crypto stream before using those.
    
    As a result of these changes, some of the workarounds added in SPARK-19520
    are removed here.
    
    Testing: a new trait ("EncryptionFunSuite") was added that provides an easy
    way to run a test twice, with encryption on and off; broadcast, block manager
    and caching tests were modified to use this new trait so that the existing
    tests exercise both encrypted and non-encrypted paths. I also ran some
    applications with encryption turned on to verify that they still work,
    including streaming tests that failed without the fix for SPARK-19520.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vanzin/spark SPARK-19556

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/17295.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #17295
    
----
commit 3aa752f9becdfe0e35a47d731736d942e3e5b3bf
Author: Marcelo Vanzin <va...@cloudera.com>
Date:   2017-02-10T23:59:51Z

    [SPARK-19556][core] Do not encrypt block manager data in memory.
    
    This change modifies the way block data is encrypted to make the more
    common cases faster, while penalizing an edge case. As a side effect
    of the change, all data that goes through the block manager is now
    encrypted only when needed, including the previous path (broadcast
    variables) where that did not happen.
    
    The way the change works is by not encrypting data that is stored in
    memory; so if a serialized block is in memory, it will only be encrypted
    once it is evicted to disk.
    
    The penalty comes when transferring that encrypted data from disk. If the
    data ends up in memory again, it is as efficient as before; but if the
    evicted block needs to be transferred directly to a remote executor, then
    there's now a performance penalty, since the code now uses a custom
    FileRegion implementation to decrypt the data before transferring.
    
    This also means that block data transferred between executors now is
    not encrypted (and thus relies on the network library encryption support
    for secrecy). Shuffle blocks are still transferred in encrypted form,
    since they're handled in a slightly different way by the code. This also
    keeps compatibility with existing external shuffle services, which transfer
    encrypted shuffle blocks, and avoids having to make the external service
    aware of encryption at all.
    
    Another change in the disk store is that it now stores a tiny metadata
    file next to the file holding the block data; this is needed to accurately
    account for the decrypted block size, which may be significantly different
    from the size of the encrypted file on disk.
    
    The serialization and deserialization APIs in the SerializerManager now
    do not do encryption automatically; callers need to explicitly wrap their
    streams with an appropriate crypto stream before using those.
    
    As a result of these changes, some of the workarounds added in SPARK-19520
    are removed here.
    
    Testing: a new trait ("EncryptionFunSuite") was added that provides an easy
    way to run a test twice, with encryption on and off; broadcast, block manager
    and caching tests were modified to use this new trait so that the existing
    tests exercise both encrypted and non-encrypted paths. I also ran some
    applications with encryption turned on to verify that they still work,
    including streaming tests that failed without the fix for SPARK-19520.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75120 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75120/testReport)** for PR 17295 at commit [`00b6d00`](https://github.com/apache/spark/commit/00b6d00b35fef1df4492288919ae54302afae8cb).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106587428
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging {
         }
         iv
       }
    +
    +  /**
    +   * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the
    +   * underlying channel. Since the callers of this API are using blocking I/O, there are no
    +   * concerns with regards to CPU usage here.
    --- End diff --
    
    is it a separated bug fix?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75323 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75323/testReport)** for PR 17295 at commit [`4a39cb2`](https://github.com/apache/spark/commit/4a39cb23dae86d4289bf529c27dae21680620cab).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74905/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107787099
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
    @@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer
     private[storage] class BlockManagerManagedBuffer(
         blockInfoManager: BlockInfoManager,
         blockId: BlockId,
    -    chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) {
    +    data: BlockData,
    +    dispose: Boolean) extends ManagedBuffer {
    --- End diff --
    
    Hmm, I prefer `dispose`, because it's not about needing to dispose the buffer, but wanting to dispose the buffer.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107323833
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
    @@ -167,30 +167,26 @@ private[spark] class SerializerManager(
         val byteStream = new BufferedOutputStream(outputStream)
         val autoPick = !blockId.isInstanceOf[StreamBlockId]
         val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
    -    ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
    --- End diff --
    
    the `wrapStream` and `wrapForEncryption` methods can be removed from this class


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > Is there any case where it is transfered in encrypted form in supported cases ?
    
    No, with these changes, only shuffle data is transferred in encrypted form.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74911 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74911/testReport)** for PR 17295 at commit [`1b2a3e4`](https://github.com/apache/spark/commit/1b2a3e48e07118b03e9607bf0dc99cf53ae4d678).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107786384
  
    --- Diff: core/src/main/scala/org/apache/spark/serializer/SerializerManager.scala ---
    @@ -167,30 +167,26 @@ private[spark] class SerializerManager(
         val byteStream = new BufferedOutputStream(outputStream)
         val autoPick = !blockId.isInstanceOf[StreamBlockId]
         val ser = getSerializer(implicitly[ClassTag[T]], autoPick).newInstance()
    -    ser.serializeStream(wrapStream(blockId, byteStream)).writeAll(values).close()
    --- End diff --
    
    They're still used in a bunch of places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107327362
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,44 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. Callers should call [[dispose()]] when they're done with the block.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  def toNetty(): Object
    --- End diff --
    
    why the return type is `Object`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106779317
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    --- End diff --
    
    We need to handle case where read() returns EOF (-1) in case of data corruption, file removal from underneath, etc : we will end up in infinite loop otherwise.
    
    I might have missed more places where this pattern exists in this change.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > What's the actual difference? previously we transfer encrypted data?
    
    Yes. The previous version of the code would transfer the encrypted file over to the receiver, and the encrypted data for serialized blocks would also be stored in `MemoryStore` (and then decrypted on every use). That means the files could just be mmap'ed for transfer, which is faster than the `ReadableByteChannel` path even without encryption in the picture. (If you consider the previous code had to decrypt from the `MemoryStore` on every read, you can end up with better performance overall with this patch.)
    
    But this caused all the other issues with making the `BlockManager` harder to use when encryption was on, so I think this is a better solution.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74989/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778932
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
    --- End diff --
    
    Utils.tryWithSafeFinally with close is required ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74911 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74911/testReport)** for PR 17295 at commit [`1b2a3e4`](https://github.com/apache/spark/commit/1b2a3e48e07118b03e9607bf0dc99cf53ae4d678).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74991/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r108087419
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
    @@ -17,31 +17,52 @@
     
     package org.apache.spark.storage
     
    -import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
    +import java.io.InputStream
    +import java.nio.ByteBuffer
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import org.apache.spark.network.buffer.ManagedBuffer
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
    - * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]]
    + * This [[ManagedBuffer]] wraps a [[BlockData]] instance retrieved from the [[BlockManager]]
      * so that the corresponding block's read lock can be released once this buffer's references
      * are released.
      *
    + * If `dispose` is set to try, the [[BlockData]]will be disposed when the buffer's reference
    --- End diff --
    
    `is set to try` -> `is set to true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106963546
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
    --- End diff --
    
    The channel is owned by the code in the `put` method, which does that.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74905 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74905/testReport)** for PR 17295 at commit [`1428fcd`](https://github.com/apache/spark/commit/1428fcd952ddcdb29a561d8c1c90d4f820955c15).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    >  shall we also transfer shuffle blocks after decryption?
    
    No. That's explained in the PR description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74905 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74905/testReport)** for PR 17295 at commit [`1428fcd`](https://github.com/apache/spark/commit/1428fcd952ddcdb29a561d8c1c90d4f820955c15).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106962403
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -17,48 +17,61 @@
     
     package org.apache.spark.storage
     
    -import java.io.{FileOutputStream, IOException, RandomAccessFile}
    +import java.io._
     import java.nio.ByteBuffer
    +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel}
     import java.nio.channels.FileChannel.MapMode
    +import java.nio.charset.StandardCharsets.UTF_8
     
    -import com.google.common.io.Closeables
    +import scala.collection.mutable.ListBuffer
     
    -import org.apache.spark.SparkConf
    +import com.google.common.io.{ByteStreams, Closeables, Files}
    +import io.netty.channel.FileRegion
    +import io.netty.util.AbstractReferenceCounted
    +
    +import org.apache.spark.{SecurityManager, SparkConf}
     import org.apache.spark.internal.Logging
    -import org.apache.spark.util.Utils
    +import org.apache.spark.network.buffer.ManagedBuffer
    +import org.apache.spark.security.CryptoStreamUtils
    +import org.apache.spark.util.{ByteBufferInputStream, Utils}
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
      * Stores BlockManager blocks on disk.
      */
    -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
    +private[spark] class DiskStore(
    +    conf: SparkConf,
    +    diskManager: DiskBlockManager,
    +    securityManager: SecurityManager) extends Logging {
     
       private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
     
       def getSize(blockId: BlockId): Long = {
    -    diskManager.getFile(blockId.name).length
    +    val file = diskManager.getMetadataFile(blockId)
    +    Files.toString(file, UTF_8).toLong
       }
     
       /**
        * Invokes the provided callback function to write the specific block.
        *
        * @throws IllegalStateException if the block already exists in the disk store.
        */
    -  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
    +  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
         if (contains(blockId)) {
           throw new IllegalStateException(s"Block $blockId is already present in the disk store")
         }
         logDebug(s"Attempting to put block $blockId")
         val startTime = System.currentTimeMillis
         val file = diskManager.getFile(blockId)
    -    val fileOutputStream = new FileOutputStream(file)
    +    val out = new CountingWritableChannel(openForWrite(file))
         var threwException: Boolean = true
         try {
    -      writeFunc(fileOutputStream)
    +      writeFunc(out)
    +      Files.write(out.getCount().toString(), diskManager.getMetadataFile(blockId), UTF_8)
           threwException = false
         } finally {
           try {
    -        Closeables.close(fileOutputStream, threwException)
    +        Closeables.close(out, threwException)
    --- End diff --
    
    This was the previous behavior, but well, doesn't hurt to fix it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107324269
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
    @@ -17,11 +17,15 @@
     
     package org.apache.spark.storage
     
    -import org.apache.spark.network.buffer.{ManagedBuffer, NettyManagedBuffer}
    +import java.io.InputStream
    +import java.nio.ByteBuffer
    +import java.util.concurrent.atomic.AtomicInteger
    +
    +import org.apache.spark.network.buffer.ManagedBuffer
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
    - * This [[ManagedBuffer]] wraps a [[ChunkedByteBuffer]] retrieved from the [[BlockManager]]
    + * This [[ManagedBuffer]] wraps a ManagedBuffer retrieved from the [[BlockManager]]
    --- End diff --
    
    `wraps a [[BlockData]]`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107786072
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +84,27 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    ByteStreams.readFully(is, iv)
    +    val params = new CryptoParams(key, sparkConf)
    +    new CryptoInputStream(params.transformation, params.conf, is, params.keySpec,
    +      new IvParameterSpec(iv))
    +  }
    +
    +  /**
    +   * Wrap a `ReadableByteChannel` for decryption.
    +   */
    +  def createReadableChannel(
    +      channel: ReadableByteChannel,
    +      sparkConf: SparkConf,
    +      key: Array[Byte]): ReadableByteChannel = {
    +    val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    +    val buf = ByteBuffer.wrap(iv)
    +    JavaUtils.readFully(channel, buf)
    --- End diff --
    
    There's no `ByteStreams.readFully` for `ReadableByteChannel` that I'm aware of.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107324480
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
    @@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer
     private[storage] class BlockManagerManagedBuffer(
         blockInfoManager: BlockInfoManager,
         blockId: BlockId,
    -    chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) {
    +    data: BlockData,
    +    dispose: Boolean) extends ManagedBuffer {
    +
    +  private val refCount = new AtomicInteger(1)
    --- End diff --
    
    maybe we should mention it in the class doc that the `BlockData` will be disposed automatically via reference count.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > Isn't it not simpler to transmit block contents in encrypted format without decryption?
    
    First, keep in mind that there's no metadata that tells the receiver whether a block is encrypted or not. This means that methods like `BlockManager.get`, which can read block data from either local or remote sources, need to return data that is either always encrypted or always not encrypted for the same block ID.
    
    This leaves two choices:
    
    - encrypt the data in all stores (memory & disk); this is what the current code does, and it requires all code that uses the BlockManager to have to deal with encryption. This is what caused SPARK-19520, and I filed SPARK-19556 to cover yet another case of a code path that did not do the right thing when encryption is enabled.
    
    - make all non-shuffle block data read from the BlockManager not encrypted. This means non-shuffle code calling the BlockManager does not have to care about encryption, since it will always read unencrypted data, and can always put unencrypted data in the BlockManager and it will be encrypted when needed (a.k.a. when writing to disk).
    
    > Remote fetch of RDD blocks is not uncommon
    
    That's fine. This change makes the data read from the BlockManager instance not encrypted. But when transmitting the data over to another executor, there's RPC-level encryption (`spark.authenticate.enableSaslEncryption` or `spark.network.crypto.enabled`), which means the data is still encrypted on the wire.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    retest this please


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106779457
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    +        throw e
    +    }
    +  }
    +
    +}
    +
    +private class EncryptedBlockData(
    +    file: File,
    +    blockSize: Long,
    +    conf: SparkConf,
    +    key: Array[Byte]) extends BlockData {
    +
    +  override def toInputStream(): InputStream = Channels.newInputStream(open())
    +
    +  override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer()
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    val source = open()
    +    try {
    +      var remaining = blockSize
    +      val chunks = new ListBuffer[ByteBuffer]()
    +      while (remaining > 0) {
    +        val chunkSize = math.min(remaining, Int.MaxValue)
    +        val chunk = allocator(chunkSize.toInt)
    +        remaining -= chunkSize
    +
    +        while (chunk.remaining() > 0) {
    +          source.read(chunk)
    --- End diff --
    
    as mentioned above, needs EOF error handling.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107952007
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.unmap()
    +            bytesToReplicate.dispose()
    --- End diff --
    
    `BlockData.dispose` calls `ChunkedByteBuffer.unmap`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106587687
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1235,7 +1251,7 @@ private[spark] class BlockManager(
               peer.port,
               peer.executorId,
               blockId,
    -          new NettyManagedBuffer(data.toNetty),
    +          new BlockManagerManagedBuffer(blockInfoManager, blockId, data.toManagedBuffer()),
    --- End diff --
    
    why this change?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107322905
  
    --- Diff: core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---
    @@ -219,18 +219,22 @@ private[spark] class TorrentBroadcast[T: ClassTag](obj: T, id: Long)
             case None =>
               logInfo("Started reading broadcast variable " + id)
               val startTimeMs = System.currentTimeMillis()
    -          val blocks = readBlocks().flatMap(_.getChunks())
    +          val blocks = readBlocks()
               logInfo("Reading broadcast variable " + id + " took" + Utils.getUsedTimeMs(startTimeMs))
     
    -          val obj = TorrentBroadcast.unBlockifyObject[T](
    -            blocks, SparkEnv.get.serializer, compressionCodec)
    -          // Store the merged copy in BlockManager so other tasks on this executor don't
    -          // need to re-fetch it.
    -          val storageLevel = StorageLevel.MEMORY_AND_DISK
    -          if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
    -            throw new SparkException(s"Failed to store $broadcastId in BlockManager")
    +          try {
    +            val obj = TorrentBroadcast.unBlockifyObject[T](
    +              blocks.map(_.toInputStream()), SparkEnv.get.serializer, compressionCodec)
    +            // Store the merged copy in BlockManager so other tasks on this executor don't
    +            // need to re-fetch it.
    +            val storageLevel = StorageLevel.MEMORY_AND_DISK
    +            if (!blockManager.putSingle(broadcastId, obj, storageLevel, tellMaster = false)) {
    +              throw new SparkException(s"Failed to store $broadcastId in BlockManager")
    +            }
    +            obj
    +          } finally {
    +            blocks.foreach(_.dispose())
    --- End diff --
    
    ah good catch! we should dispose the blocks here


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107326715
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.unmap()
    +            bytesToReplicate.dispose()
    --- End diff --
    
    why change `unmap` to `dispose`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by mallman <gi...@git.apache.org>.
Github user mallman commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > LGTM, cc @mallman to check the unmap part
    
    LGTM, too. Sorry for the late reply... I've been away the past two weeks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778688
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -17,48 +17,61 @@
     
     package org.apache.spark.storage
     
    -import java.io.{FileOutputStream, IOException, RandomAccessFile}
    +import java.io._
     import java.nio.ByteBuffer
    +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel}
     import java.nio.channels.FileChannel.MapMode
    +import java.nio.charset.StandardCharsets.UTF_8
     
    -import com.google.common.io.Closeables
    +import scala.collection.mutable.ListBuffer
     
    -import org.apache.spark.SparkConf
    +import com.google.common.io.{ByteStreams, Closeables, Files}
    +import io.netty.channel.FileRegion
    +import io.netty.util.AbstractReferenceCounted
    +
    +import org.apache.spark.{SecurityManager, SparkConf}
     import org.apache.spark.internal.Logging
    -import org.apache.spark.util.Utils
    +import org.apache.spark.network.buffer.ManagedBuffer
    +import org.apache.spark.security.CryptoStreamUtils
    +import org.apache.spark.util.{ByteBufferInputStream, Utils}
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
      * Stores BlockManager blocks on disk.
      */
    -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
    +private[spark] class DiskStore(
    +    conf: SparkConf,
    +    diskManager: DiskBlockManager,
    +    securityManager: SecurityManager) extends Logging {
     
       private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
     
       def getSize(blockId: BlockId): Long = {
    -    diskManager.getFile(blockId.name).length
    +    val file = diskManager.getMetadataFile(blockId)
    +    Files.toString(file, UTF_8).toLong
    --- End diff --
    
    Metadata file should be used only when required - otherwise we should avoid their use.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74997/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106962007
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -34,6 +34,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
      */
     private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
     
    +  private val METADATA_FILE_SUFFIX = ".meta"
    --- End diff --
    
    Hmm, good point... there's currently no metadata kept in the `DiskStore` class, but then this shouldn't be a lot of data.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74555 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74555/testReport)** for PR 17295 at commit [`3aa752f`](https://github.com/apache/spark/commit/3aa752f9becdfe0e35a47d731736d942e3e5b3bf).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107787884
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,44 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. Callers should call [[dispose()]] when they're done with the block.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  def toNetty(): Object
    --- End diff --
    
    See `ManagedBuffer.convertToNetty()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75226/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75267 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75267/testReport)** for PR 17295 at commit [`ab4b5dd`](https://github.com/apache/spark/commit/ab4b5dd7191c9f73f5419180cedd3051621bd83a).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106063310
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -48,12 +50,30 @@ private[spark] object CryptoStreamUtils extends Logging {
           os: OutputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): OutputStream = {
    -    val properties = toCryptoConf(sparkConf)
    -    val iv = createInitializationVector(properties)
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
         os.write(iv)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoOutputStream(transformationStr, properties, os,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec,
    +      new IvParameterSpec(iv))
    +  }
    +
    +  /**
    +   * Wrap a `WritableByteChannel` for encryption.
    +   */
    +  def createWritableChannel(
    +      channel: WritableByteChannel,
    +      sparkConf: SparkConf,
    +      key: Array[Byte]): WritableByteChannel = {
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
    +    val buf = ByteBuffer.wrap(iv)
    +    while (buf.remaining() > 0) {
    --- End diff --
    
    nit: buf.hasRemaining for this pattern of use


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107324613
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManagerManagedBuffer.scala ---
    @@ -31,17 +35,31 @@ import org.apache.spark.util.io.ChunkedByteBuffer
     private[storage] class BlockManagerManagedBuffer(
         blockInfoManager: BlockInfoManager,
         blockId: BlockId,
    -    chunkedBuffer: ChunkedByteBuffer) extends NettyManagedBuffer(chunkedBuffer.toNetty) {
    +    data: BlockData,
    +    dispose: Boolean) extends ManagedBuffer {
    --- End diff --
    
    `needDispose` may be a better name


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74555/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74555 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74555/testReport)** for PR 17295 at commit [`3aa752f`](https://github.com/apache/spark/commit/3aa752f9becdfe0e35a47d731736d942e3e5b3bf).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106779546
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    +        throw e
    +    }
    +  }
    +
    +}
    +
    +private class EncryptedBlockData(
    +    file: File,
    +    blockSize: Long,
    +    conf: SparkConf,
    +    key: Array[Byte]) extends BlockData {
    +
    +  override def toInputStream(): InputStream = Channels.newInputStream(open())
    +
    +  override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer()
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    val source = open()
    +    try {
    +      var remaining = blockSize
    +      val chunks = new ListBuffer[ByteBuffer]()
    +      while (remaining > 0) {
    +        val chunkSize = math.min(remaining, Int.MaxValue)
    +        val chunk = allocator(chunkSize.toInt)
    +        remaining -= chunkSize
    +
    +        while (chunk.remaining() > 0) {
    +          source.read(chunk)
    +        }
    +        chunk.flip()
    +        chunks += chunk
    +      }
    +
    +      new ChunkedByteBuffer(chunks.toArray)
    +    } finally {
    +      source.close()
    +    }
    +  }
    +
    +  override def size: Long = blockSize
    +
    +  override def dispose(): Unit = { }
    +
    +  private def open(): ReadableByteChannel = {
    +    val channel = new FileInputStream(file).getChannel()
    +    try {
    +      CryptoStreamUtils.createReadableChannel(channel, conf, key)
    +    } catch {
    +      case e: Exception =>
    +        Closeables.close(channel, true)
    +        throw e
    +    }
    +  }
    +
    +  private class EncryptedManagedBuffer extends ManagedBuffer {
    +
    +    override def size(): Long = blockSize
    +
    +    override def nioByteBuffer(): ByteBuffer = {
    +      // This is used by the block transfer service to replicate blocks. The upload code reads
    +      // all bytes into memory to send the block to the remote executor, so it's ok to do this
    +      // as long as the block fits in a Java array.
    +      assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.")
    +      val is = toInputStream()
    +      try {
    +        ByteBuffer.wrap(ByteStreams.toByteArray(is))
    +      } finally {
    +        Closeables.close(is, true)
    +      }
    +    }
    +
    +    override def createInputStream(): InputStream = toInputStream()
    +
    +    override def convertToNetty(): Object = new ReadableChannelFileRegion(open(), blockSize)
    +
    +    override def retain(): ManagedBuffer = this
    +
    +    override def release(): ManagedBuffer = this
    +
    +  }
    +
    +}
    +
    +private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long)
    +  extends AbstractReferenceCounted with FileRegion {
    +
    +  private var _transferred = 0L
    +
    +  private val buffer = ByteBuffer.allocateDirect(64 * 1024)
    +  buffer.flip()
    +
    +  override def count(): Long = blockSize
    +
    +  override def position(): Long = 0
    +
    +  override def transfered(): Long = _transferred
    +
    +  override def transferTo(target: WritableByteChannel, pos: Long): Long = {
    +    assert(pos == transfered(), "Invalid position.")
    +
    +    var written = 0L
    +    var lastWrite = -1L
    +    while (lastWrite != 0) {
    +      if (buffer.remaining() == 0) {
    +        buffer.clear()
    +        source.read(buffer)
    +        buffer.flip()
    +      }
    +      if (buffer.remaining() > 0) {
    +        lastWrite = target.write(buffer)
    +        written += lastWrite
    +      } else {
    +        lastWrite = 0
    +      }
    +    }
    +
    +    _transferred += written
    +    written
    +  }
    +
    +  override def deallocate(): Unit = source.close()
    --- End diff --
    
    release buffer as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by zsxwing <gi...@git.apache.org>.
Github user zsxwing commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106749569
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    var read = 0
    +    while (read < iv.length) {
    --- End diff --
    
    Yeah, you can just use `ByteStreams.readFully(is, iv)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74997 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74997/testReport)** for PR 17295 at commit [`6bda670`](https://github.com/apache/spark/commit/6bda6701bf0c266047a5fa81fd29f4fb826728c7).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74989 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74989/testReport)** for PR 17295 at commit [`6848a59`](https://github.com/apache/spark/commit/6848a592df16f778afebf26b19f81f6f23b80aa4).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by sameeragarwal <gi...@git.apache.org>.
Github user sameeragarwal commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    cc @cloud-fan @ueshin 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r108046997
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,49 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. Callers should call [[dispose()]] when they're done with the block.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  /**
    +   * Returns a Netty-friendly wrapper for the block's data.
    +   *
    +   * @see [[ManagedBuffer#convertToNetty()]]
    +   */
    +  def toNetty(): Object
    +
    +  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
    +
    +  def toByteBuffer(): ByteBuffer
    +
    +  def size: Long
    +
    +  def dispose(): Unit
    +
    +}
    +
    +private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) extends BlockData {
    +
    +  override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)
    +
    +  override def toNetty(): Object = buffer.toNetty
    +
    +  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    buffer.copy(allocator)
    +  }
    +
    +  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
    +
    +  override def size: Long = buffer.size
    +
    +  override def dispose(): Unit = buffer.unmap()
    --- End diff --
    
    BTW I'm really starting to think the fix in #16499, while technically correct, is more confusing that it should be. The problem is not that the code was disposing of off-heap buffers; the problem is that buffers read from the memory store should not be disposed of, while buffers read from the disk store should.
    
    So it's not really a matter of dispose vs. unmap, but a matter of where the buffer come from. (Which is kinda what I had in this patch with the `autoDispose` parameter to `ByteBufferBlockData`. Perhaps I should revive that and get rid of `StorageUtils.unmap`, which is just confusing.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75267 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75267/testReport)** for PR 17295 at commit [`ab4b5dd`](https://github.com/apache/spark/commit/ab4b5dd7191c9f73f5419180cedd3051621bd83a).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107785760
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging {
           os: OutputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): OutputStream = {
    -    val properties = toCryptoConf(sparkConf)
    -    val iv = createInitializationVector(properties)
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
         os.write(iv)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoOutputStream(transformationStr, properties, os,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec,
    +      new IvParameterSpec(iv))
    +  }
    +
    +  /**
    +   * Wrap a `WritableByteChannel` for encryption.
    +   */
    +  def createWritableChannel(
    +      channel: WritableByteChannel,
    +      sparkConf: SparkConf,
    +      key: Array[Byte]): WritableByteChannel = {
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
    +    val buf = ByteBuffer.wrap(iv)
    +    while (buf.hasRemaining()) {
    --- End diff --
    
    No, there's no infinite loop here, because a failure would cause an exception. Yeah, using the helper should work too.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/17295


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106691642
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    var read = 0
    +    while (read < iv.length) {
    --- End diff --
    
    It avoids issues with short reads. It's unlikely to happen but I always write read code like this to be safe.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74989 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74989/testReport)** for PR 17295 at commit [`6848a59`](https://github.com/apache/spark/commit/6848a592df16f778afebf26b19f81f6f23b80aa4).
     * This patch **fails to build**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r108035391
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,49 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. Callers should call [[dispose()]] when they're done with the block.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  /**
    +   * Returns a Netty-friendly wrapper for the block's data.
    +   *
    +   * @see [[ManagedBuffer#convertToNetty()]]
    +   */
    +  def toNetty(): Object
    +
    +  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
    +
    +  def toByteBuffer(): ByteBuffer
    +
    +  def size: Long
    +
    +  def dispose(): Unit
    +
    +}
    +
    +private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) extends BlockData {
    +
    +  override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)
    +
    +  override def toNetty(): Object = buffer.toNetty
    +
    +  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    buffer.copy(allocator)
    +  }
    +
    +  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
    +
    +  override def size: Long = buffer.size
    +
    +  override def dispose(): Unit = buffer.unmap()
    --- End diff --
    
    can we define the semantic of the `BlockData.dispose` clearly? It's quite confusing here that the `dispose` method call `buffer.unmap` while `ChunkedByteBuffer` also has a `dispose` method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Just to be clear, I would prefer if we consistently did things - either encrypt all blocks while transferring (irrespective of sasl being enabled or not); or depend only on sasl for channel encryption.
    But given this was what it currently is, I am not sure if it was by design or accident; and what the tradeoff's for ensuring consistency is.
    
    (The workaround is, what I mentioned above, tagging)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74906/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74997 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74997/testReport)** for PR 17295 at commit [`6bda670`](https://github.com/apache/spark/commit/6bda6701bf0c266047a5fa81fd29f4fb826728c7).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74991 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74991/testReport)** for PR 17295 at commit [`107e3e7`](https://github.com/apache/spark/commit/107e3e72e81d2c7813d832d3e9c2beab89e01379).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106691384
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    +        throw e
    +    }
    +  }
    +
    +}
    +
    +private class EncryptedBlockData(
    +    file: File,
    +    blockSize: Long,
    +    conf: SparkConf,
    +    key: Array[Byte]) extends BlockData {
    +
    +  override def toInputStream(): InputStream = Channels.newInputStream(open())
    +
    +  override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer()
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    val source = open()
    +    try {
    +      var remaining = blockSize
    +      val chunks = new ListBuffer[ByteBuffer]()
    +      while (remaining > 0) {
    +        val chunkSize = math.min(remaining, Int.MaxValue)
    +        val chunk = allocator(chunkSize.toInt)
    +        remaining -= chunkSize
    +
    +        while (chunk.remaining() > 0) {
    +          source.read(chunk)
    +        }
    +        chunk.flip()
    +        chunks += chunk
    +      }
    +
    +      new ChunkedByteBuffer(chunks.toArray)
    +    } finally {
    +      source.close()
    +    }
    +  }
    +
    +  override def size: Long = blockSize
    +
    +  override def dispose(): Unit = { }
    +
    +  private def open(): ReadableByteChannel = {
    +    val channel = new FileInputStream(file).getChannel()
    +    try {
    +      CryptoStreamUtils.createReadableChannel(channel, conf, key)
    +    } catch {
    +      case e: Exception =>
    +        Closeables.close(channel, true)
    +        throw e
    +    }
    +  }
    +
    +  private class EncryptedManagedBuffer extends ManagedBuffer {
    +
    +    override def size(): Long = blockSize
    +
    +    override def nioByteBuffer(): ByteBuffer = {
    +      // This is used by the block transfer service to replicate blocks. The upload code reads
    +      // all bytes into memory to send the block to the remote executor, so it's ok to do this
    +      // as long as the block fits in a Java array.
    +      assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.")
    +      val is = toInputStream()
    +      try {
    +        ByteBuffer.wrap(ByteStreams.toByteArray(is))
    --- End diff --
    
    There's a comment explaining it a few lines above...


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74991 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74991/testReport)** for PR 17295 at commit [`107e3e7`](https://github.com/apache/spark/commit/107e3e72e81d2c7813d832d3e9c2beab89e01379).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107323983
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,44 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. Callers should call [[dispose()]] when they're done with the block.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  def toNetty(): Object
    +
    +  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
    +
    +  def toByteBuffer(): ByteBuffer
    --- End diff --
    
    it will be great to add some document for these 4 methods about when they will be called.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106691863
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging {
         }
         iv
       }
    +
    +  /**
    +   * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the
    +   * underlying channel. Since the callers of this API are using blocking I/O, there are no
    +   * concerns with regards to CPU usage here.
    --- End diff --
    
    No. As the comment states, it's a workaround for a bug in the commons-crypto library, which would affect the code being added.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778650
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -79,6 +81,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
     
       def getFile(blockId: BlockId): File = getFile(blockId.name)
     
    +  /** The path of the metadata file for the given block. */
    +  def getMetadataFile(blockId: BlockId): File = {
    +    new File(getFile(blockId).getAbsolutePath() + METADATA_FILE_SUFFIX)
    --- End diff --
    
    Would be good to add a note that actual filename for metadata does not match directory hashing used (but depends on the source block).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    thanks, merging to master!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778888
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -94,7 +101,11 @@ private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolea
           }
         }.filter(_ != null).flatMap { dir =>
           val files = dir.listFiles()
    -      if (files != null) files else Seq.empty
    +      if (files != null) {
    +        files.filter(!_.getName().endsWith(METADATA_FILE_SUFFIX))
    +      } else {
    +        Seq.empty
    +      }
    --- End diff --
    
    getAllFiles should be returning all files stored.
    For example, test suite's use getAllFiles to remove all files using this api.
    
    On other hand, getAllBlocks needs to ensure metadata file's are filtered out - this check should be moved there.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107327188
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -17,48 +17,67 @@
     
     package org.apache.spark.storage
     
    -import java.io.{FileOutputStream, IOException, RandomAccessFile}
    +import java.io._
     import java.nio.ByteBuffer
    +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel}
     import java.nio.channels.FileChannel.MapMode
    +import java.nio.charset.StandardCharsets.UTF_8
    +import java.util.concurrent.ConcurrentHashMap
     
    -import com.google.common.io.Closeables
    +import scala.collection.mutable.ListBuffer
     
    -import org.apache.spark.SparkConf
    +import com.google.common.io.{ByteStreams, Closeables, Files}
    +import io.netty.channel.FileRegion
    +import io.netty.util.AbstractReferenceCounted
    +
    +import org.apache.spark.{SecurityManager, SparkConf}
     import org.apache.spark.internal.Logging
    -import org.apache.spark.util.Utils
    +import org.apache.spark.network.buffer.ManagedBuffer
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.security.CryptoStreamUtils
    +import org.apache.spark.util.{ByteBufferInputStream, Utils}
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
      * Stores BlockManager blocks on disk.
      */
    -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
    +private[spark] class DiskStore(
    +    conf: SparkConf,
    +    diskManager: DiskBlockManager,
    +    securityManager: SecurityManager) extends Logging {
     
       private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
    +  private val blockSizes = new ConcurrentHashMap[String, Long]()
     
    -  def getSize(blockId: BlockId): Long = {
    -    diskManager.getFile(blockId.name).length
    -  }
    +  def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name)
     
       /**
        * Invokes the provided callback function to write the specific block.
        *
        * @throws IllegalStateException if the block already exists in the disk store.
        */
    -  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
    +  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
         if (contains(blockId)) {
           throw new IllegalStateException(s"Block $blockId is already present in the disk store")
         }
         logDebug(s"Attempting to put block $blockId")
         val startTime = System.currentTimeMillis
         val file = diskManager.getFile(blockId)
    -    val fileOutputStream = new FileOutputStream(file)
    +    val out = new CountingWritableChannel(openForWrite(file))
         var threwException: Boolean = true
         try {
    -      writeFunc(fileOutputStream)
    +      writeFunc(out)
    +      blockSizes.put(blockId.name, out.getCount)
           threwException = false
         } finally {
           try {
    -        Closeables.close(fileOutputStream, threwException)
    +        out.close()
    +      } catch {
    +        case ioe: IOException =>
    --- End diff --
    
    why this? `threwException` starts with `true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106588563
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    +        throw e
    +    }
    +  }
    +
    +}
    +
    +private class EncryptedBlockData(
    +    file: File,
    +    blockSize: Long,
    +    conf: SparkConf,
    +    key: Array[Byte]) extends BlockData {
    +
    +  override def toInputStream(): InputStream = Channels.newInputStream(open())
    +
    +  override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer()
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    val source = open()
    +    try {
    +      var remaining = blockSize
    +      val chunks = new ListBuffer[ByteBuffer]()
    +      while (remaining > 0) {
    +        val chunkSize = math.min(remaining, Int.MaxValue)
    +        val chunk = allocator(chunkSize.toInt)
    +        remaining -= chunkSize
    +
    +        while (chunk.remaining() > 0) {
    +          source.read(chunk)
    +        }
    +        chunk.flip()
    +        chunks += chunk
    +      }
    +
    +      new ChunkedByteBuffer(chunks.toArray)
    +    } finally {
    +      source.close()
    +    }
    +  }
    +
    +  override def size: Long = blockSize
    +
    +  override def dispose(): Unit = { }
    +
    +  private def open(): ReadableByteChannel = {
    +    val channel = new FileInputStream(file).getChannel()
    +    try {
    +      CryptoStreamUtils.createReadableChannel(channel, conf, key)
    +    } catch {
    +      case e: Exception =>
    +        Closeables.close(channel, true)
    +        throw e
    +    }
    +  }
    +
    +  private class EncryptedManagedBuffer extends ManagedBuffer {
    +
    +    override def size(): Long = blockSize
    +
    +    override def nioByteBuffer(): ByteBuffer = {
    +      // This is used by the block transfer service to replicate blocks. The upload code reads
    +      // all bytes into memory to send the block to the remote executor, so it's ok to do this
    +      // as long as the block fits in a Java array.
    +      assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.")
    +      val is = toInputStream()
    +      try {
    +        ByteBuffer.wrap(ByteStreams.toByteArray(is))
    --- End diff --
    
    will we read all data out here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107789203
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,44 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. Callers should call [[dispose()]] when they're done with the block.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  def toNetty(): Object
    +
    +  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
    +
    +  def toByteBuffer(): ByteBuffer
    --- End diff --
    
    I added scaladoc for `toNetty()`, but the others seem self-explanatory to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778914
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -17,48 +17,61 @@
     
     package org.apache.spark.storage
     
    -import java.io.{FileOutputStream, IOException, RandomAccessFile}
    +import java.io._
     import java.nio.ByteBuffer
    +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel}
     import java.nio.channels.FileChannel.MapMode
    +import java.nio.charset.StandardCharsets.UTF_8
     
    -import com.google.common.io.Closeables
    +import scala.collection.mutable.ListBuffer
     
    -import org.apache.spark.SparkConf
    +import com.google.common.io.{ByteStreams, Closeables, Files}
    +import io.netty.channel.FileRegion
    +import io.netty.util.AbstractReferenceCounted
    +
    +import org.apache.spark.{SecurityManager, SparkConf}
     import org.apache.spark.internal.Logging
    -import org.apache.spark.util.Utils
    +import org.apache.spark.network.buffer.ManagedBuffer
    +import org.apache.spark.security.CryptoStreamUtils
    +import org.apache.spark.util.{ByteBufferInputStream, Utils}
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
      * Stores BlockManager blocks on disk.
      */
    -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
    +private[spark] class DiskStore(
    +    conf: SparkConf,
    +    diskManager: DiskBlockManager,
    +    securityManager: SecurityManager) extends Logging {
     
       private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
     
       def getSize(blockId: BlockId): Long = {
    -    diskManager.getFile(blockId.name).length
    +    val file = diskManager.getMetadataFile(blockId)
    +    Files.toString(file, UTF_8).toLong
       }
     
       /**
        * Invokes the provided callback function to write the specific block.
        *
        * @throws IllegalStateException if the block already exists in the disk store.
        */
    -  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
    +  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
         if (contains(blockId)) {
           throw new IllegalStateException(s"Block $blockId is already present in the disk store")
         }
         logDebug(s"Attempting to put block $blockId")
         val startTime = System.currentTimeMillis
         val file = diskManager.getFile(blockId)
    -    val fileOutputStream = new FileOutputStream(file)
    +    val out = new CountingWritableChannel(openForWrite(file))
         var threwException: Boolean = true
         try {
    -      writeFunc(fileOutputStream)
    +      writeFunc(out)
    +      Files.write(out.getCount().toString(), diskManager.getMetadataFile(blockId), UTF_8)
    --- End diff --
    
    Is there any reason to keep this in string form ?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > Not really sure what you mean here. But transferring encrypted data without RPC encryption is not really secure, since the encryption key is transferred to executors using an RPC. There's even a warning message if RPC encryption is not on and you enable disk encryption.
    
    Good point, I overlooked that.
    So to summarize, after this change, RDD block's transferred will always be in plain text; with an implicit requirement that rpc encryption is strongly preferred to be enabled.
    Is there any case where it is transfered in encrypted form in supported cases ? (cases being: broadcast, rdd block transfer, replication, anything else ?)
    I wanted to ensure I understand what the final expected behavior/state would be, and how consistent we will become.
    
    I agree about shuffle being special case'd; I was looking at only non-shuffle blocks.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > This can be solved by tagging the block data with a prefix byte
    
    Sure, it could be solved in different ways. I just happened to prefer the one in this patch, since I think it's less intrusive; if you look closely, the majority of changes are in a single class (`DiskStore`), and there's mostly minor adjustments in other places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75323 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75323/testReport)** for PR 17295 at commit [`4a39cb2`](https://github.com/apache/spark/commit/4a39cb23dae86d4289bf529c27dae21680620cab).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74906 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74906/testReport)** for PR 17295 at commit [`1b2a3e4`](https://github.com/apache/spark/commit/1b2a3e48e07118b03e9607bf0dc99cf53ae4d678).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106961480
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging {
         }
         iv
       }
    +
    +  /**
    +   * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the
    --- End diff --
    
    There's a pretty nasty workaround for it in the network library... (the non-blocking workaround is a lot worse than this.)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107323246
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +84,27 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    ByteStreams.readFully(is, iv)
    +    val params = new CryptoParams(key, sparkConf)
    +    new CryptoInputStream(params.transformation, params.conf, is, params.keySpec,
    +      new IvParameterSpec(iv))
    +  }
    +
    +  /**
    +   * Wrap a `ReadableByteChannel` for decryption.
    +   */
    +  def createReadableChannel(
    +      channel: ReadableByteChannel,
    +      sparkConf: SparkConf,
    +      key: Array[Byte]): ReadableByteChannel = {
    +    val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    +    val buf = ByteBuffer.wrap(iv)
    +    JavaUtils.readFully(channel, buf)
    --- End diff --
    
    why not use `ByteStreams.readFully`? the `buf` is not used else where


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106964049
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    --- End diff --
    
    There might be exceptions specific for the commons-crypto library being thrown.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106269093
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -102,4 +150,34 @@ private[spark] object CryptoStreamUtils extends Logging {
         }
         iv
       }
    +
    +  /**
    +   * This class is a workaround for CRYPTO-125, that forces all bytes to be written to the
    --- End diff --
    
    This is a lousy bug ! Good thing that we dont seem to be hit by it (yet).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106264689
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    var read = 0
    +    while (read < iv.length) {
    +      val _read = is.read(iv, 0, iv.length)
    +      if (_read < 0) {
    +        throw new EOFException("Failed to read IV from stream.")
    +      }
    +      read += _read
    +    }
    --- End diff --
    
    ByteStreams.readFully instead of the loop


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    makes sense. one more question, ideally, shall we also transfer shuffle blocks after decryption?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75267/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    I removed `StorageUtils.unmap()` in my last commit (see commit message for details). That makes the confusion go away.
    
    The replication tests fail from time to time but they seem to be flaky without this patch. See;
    https://spark-tests.appspot.com/test-details?suite_name=org.apache.spark.storage.BlockManagerProactiveReplicationSuite&test_name=proactive+block+replication+-+5+replicas+-+4+block+manager+deletions
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107323552
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging {
           os: OutputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): OutputStream = {
    -    val properties = toCryptoConf(sparkConf)
    -    val iv = createInitializationVector(properties)
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
         os.write(iv)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoOutputStream(transformationStr, properties, os,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec,
    +      new IvParameterSpec(iv))
    +  }
    +
    +  /**
    +   * Wrap a `WritableByteChannel` for encryption.
    +   */
    +  def createWritableChannel(
    +      channel: WritableByteChannel,
    +      sparkConf: SparkConf,
    +      key: Array[Byte]): WritableByteChannel = {
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
    +    val buf = ByteBuffer.wrap(iv)
    +    while (buf.hasRemaining()) {
    --- End diff --
    
    actually this logic is same with `CryptoHelperChannel`. Shall we create `CryptoHelperChannel` first and simply call `helper.write(buf)` here?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > First, keep in mind that there's no metadata that tells the receiver whether a block is encrypted or not. This means that methods like BlockManager.get, which can read block data from either local or remote sources, need to return data that is either always encrypted or always not encrypted for the same block ID.
    
    This can be solved by tagging the block data with a prefix byte - we do something similar for MapStatus (direct or broadcast).



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > Just to be clear, I would prefer if we consistently did things - either encrypt all blocks while transferring (irrespective of sasl being enabled or not); or depend only on sasl for channel encryption.
    
    Not really sure what you mean here. But transferring encrypted data without RPC encryption is not really secure, since the encryption key is transferred to executors using an RPC. There's even a warning message if RPC encryption is not on and you enable disk encryption.
    
    Shuffle is a different beast - I explain why the shuffle blocks are transferred in encrypted form in the PR description.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    I have not looked at the implementation in detail, but can you comment on why the change w.r.t plain text block data to remote executor ? Isn't it not simpler to transmit block contents in encrypted format without decryption ?
    
    Remote fetch of RDD blocks is not uncommon (for any task other than PROCESS_LOCAL); and I wanted to better understand why this is required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106587322
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    var read = 0
    +    while (read < iv.length) {
    --- End diff --
    
    what does this while loop do?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107786888
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.unmap()
    +            bytesToReplicate.dispose()
    --- End diff --
    
    Because there's no `BlockData.unmap()`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106965268
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    +        throw e
    +    }
    +  }
    +
    +}
    +
    +private class EncryptedBlockData(
    +    file: File,
    +    blockSize: Long,
    +    conf: SparkConf,
    +    key: Array[Byte]) extends BlockData {
    +
    +  override def toInputStream(): InputStream = Channels.newInputStream(open())
    +
    +  override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer()
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    val source = open()
    +    try {
    +      var remaining = blockSize
    +      val chunks = new ListBuffer[ByteBuffer]()
    +      while (remaining > 0) {
    +        val chunkSize = math.min(remaining, Int.MaxValue)
    +        val chunk = allocator(chunkSize.toInt)
    +        remaining -= chunkSize
    +
    +        while (chunk.remaining() > 0) {
    +          source.read(chunk)
    +        }
    +        chunk.flip()
    +        chunks += chunk
    +      }
    +
    +      new ChunkedByteBuffer(chunks.toArray)
    +    } finally {
    +      source.close()
    +    }
    +  }
    +
    +  override def size: Long = blockSize
    +
    +  override def dispose(): Unit = { }
    +
    +  private def open(): ReadableByteChannel = {
    +    val channel = new FileInputStream(file).getChannel()
    +    try {
    +      CryptoStreamUtils.createReadableChannel(channel, conf, key)
    +    } catch {
    +      case e: Exception =>
    +        Closeables.close(channel, true)
    +        throw e
    +    }
    +  }
    +
    +  private class EncryptedManagedBuffer extends ManagedBuffer {
    +
    +    override def size(): Long = blockSize
    +
    +    override def nioByteBuffer(): ByteBuffer = {
    +      // This is used by the block transfer service to replicate blocks. The upload code reads
    +      // all bytes into memory to send the block to the remote executor, so it's ok to do this
    +      // as long as the block fits in a Java array.
    +      assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.")
    +      val is = toInputStream()
    +      try {
    +        ByteBuffer.wrap(ByteStreams.toByteArray(is))
    +      } finally {
    +        Closeables.close(is, true)
    +      }
    +    }
    +
    +    override def createInputStream(): InputStream = toInputStream()
    +
    +    override def convertToNetty(): Object = new ReadableChannelFileRegion(open(), blockSize)
    +
    +    override def retain(): ManagedBuffer = this
    +
    +    override def release(): ManagedBuffer = this
    +
    +  }
    +
    +}
    +
    +private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long)
    +  extends AbstractReferenceCounted with FileRegion {
    +
    +  private var _transferred = 0L
    +
    +  private val buffer = ByteBuffer.allocateDirect(64 * 1024)
    +  buffer.flip()
    +
    +  override def count(): Long = blockSize
    +
    +  override def position(): Long = 0
    +
    +  override def transfered(): Long = _transferred
    +
    +  override def transferTo(target: WritableByteChannel, pos: Long): Long = {
    +    assert(pos == transfered(), "Invalid position.")
    +
    +    var written = 0L
    +    var lastWrite = -1L
    +    while (lastWrite != 0) {
    +      if (buffer.remaining() == 0) {
    +        buffer.clear()
    +        source.read(buffer)
    +        buffer.flip()
    +      }
    +      if (buffer.remaining() > 0) {
    +        lastWrite = target.write(buffer)
    +        written += lastWrite
    +      } else {
    +        lastWrite = 0
    +      }
    +    }
    +
    +    _transferred += written
    +    written
    +  }
    +
    +  override def deallocate(): Unit = source.close()
    --- End diff --
    
    `StorageUtils.dispose` specifically checks for mapped buffers, which is not the case here. It could be changed, but in this case I wonder if it's necessary or if waiting for GC is good enough.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #74906 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/74906/testReport)** for PR 17295 at commit [`1b2a3e4`](https://github.com/apache/spark/commit/1b2a3e48e07118b03e9607bf0dc99cf53ae4d678).
     * This patch **fails Spark unit tests**.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75226 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75226/testReport)** for PR 17295 at commit [`d4013f9`](https://github.com/apache/spark/commit/d4013f9453ca4d7c4a2462c420826da46f514fcc).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107007132
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,43 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. The data for a BlockData instance can only be read once, since it may be backed by open
    + * file descriptors that change state as data is read.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  def toManagedBuffer(): ManagedBuffer
    +
    +  def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
    +
    +  def size: Long
    +
    +  def dispose(): Unit
    +
    +}
    +
    +private[spark] class ByteBufferBlockData(
    +    val buffer: ChunkedByteBuffer,
    +    autoDispose: Boolean = true) extends BlockData {
    +
    +  override def toInputStream(): InputStream = buffer.toInputStream(dispose = autoDispose)
    +
    +  override def toManagedBuffer(): ManagedBuffer = new NettyManagedBuffer(buffer.toNetty)
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    buffer.copy(allocator)
    +  }
    --- End diff --
    
    So I had traced through that stuff 2 or 3 times, and now I did it again and I think I finally understood all that's going on. Basically, the old code was really bad at explicitly disposing of the buffers, meaning a bunch of paths (like the ones that used managed buffers) didn't bother to do it and just left the work to the GC.
    
    I changed the code a bit to make the dispose more explicit and added comments in a few key places.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107832519
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1065,7 +1084,7 @@ private[spark] class BlockManager(
               try {
                 replicate(blockId, bytesToReplicate, level, remoteClassTag)
               } finally {
    -            bytesToReplicate.unmap()
    +            bytesToReplicate.dispose()
    --- End diff --
    
    I'm afraid this may counteract the effort we made in https://github.com/apache/spark/pull/16499
    
    Ideally `unmap` and `dispose` do different things
    
    cc @mallman 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778760
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    --- End diff --
    
    Other than IOException, what other exceptions are being thrown here ? Would be better to catch specific exceptions. At the least, change it to NonFatal if which exceptions thrown are unknown.
    
    Also, we would need to delete file here in addition to closing the channel.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75226 has finished](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75226/testReport)** for PR 17295 at commit [`d4013f9`](https://github.com/apache/spark/commit/d4013f9453ca4d7c4a2462c420826da46f514fcc).
     * This patch passes all tests.
     * This patch merges cleanly.
     * This patch adds no public classes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r108046686
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,49 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. Callers should call [[dispose()]] when they're done with the block.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  /**
    +   * Returns a Netty-friendly wrapper for the block's data.
    +   *
    +   * @see [[ManagedBuffer#convertToNetty()]]
    +   */
    +  def toNetty(): Object
    +
    +  def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
    +
    +  def toByteBuffer(): ByteBuffer
    +
    +  def size: Long
    +
    +  def dispose(): Unit
    +
    +}
    +
    +private[spark] class ByteBufferBlockData(val buffer: ChunkedByteBuffer) extends BlockData {
    +
    +  override def toInputStream(): InputStream = buffer.toInputStream(dispose = false)
    +
    +  override def toNetty(): Object = buffer.toNetty
    +
    +  override def toChunkedByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    buffer.copy(allocator)
    +  }
    +
    +  override def toByteBuffer(): ByteBuffer = buffer.toByteBuffer
    +
    +  override def size: Long = buffer.size
    +
    +  override def dispose(): Unit = buffer.unmap()
    --- End diff --
    
    I think `BlockData.dispose()` is pretty well defined. "Release any resources held by the object." What's confusing is that there's both `dispose()` and `unmap()` in `ChunkedByteBuffer`, when there used to be only `dispose()`. It's confusing to have two different methods for releasing resources, and that confusion is not being caused by this patch.
    
    `BlockData` is not just a wrapper around `ChunkedByteBuffer`; if it were there wouldn't be a need for it. Which is why calling the method `unmmap()` wouldn't make any sense here, since that's very specific to memory-mapped byte buffers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106268712
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    var read = 0
    +    while (read < iv.length) {
    +      val _read = is.read(iv, 0, iv.length)
    +      if (_read < 0) {
    +        throw new EOFException("Failed to read IV from stream.")
    +      }
    +      read += _read
    +    }
    +
    +    val params = new CryptoParams(key, sparkConf)
    +    new CryptoInputStream(params.transformation, params.conf, is, params.keySpec,
    +      new IvParameterSpec(iv))
    +  }
    +
    +  /**
    +   * Wrap a `ReadableByteChannel` for decryption.
    +   */
    +  def createReadableChannel(
    +      channel: ReadableByteChannel,
    +      sparkConf: SparkConf,
    +      key: Array[Byte]): ReadableByteChannel = {
    +    val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    +    val buf = ByteBuffer.wrap(iv)
    +    buf.clear()
    --- End diff --
    
    nit: The clear is not required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107323085
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -48,12 +51,30 @@ private[spark] object CryptoStreamUtils extends Logging {
           os: OutputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): OutputStream = {
    -    val properties = toCryptoConf(sparkConf)
    -    val iv = createInitializationVector(properties)
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
         os.write(iv)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoOutputStream(transformationStr, properties, os,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    new CryptoOutputStream(params.transformation, params.conf, os, params.keySpec,
    +      new IvParameterSpec(iv))
    +  }
    +
    +  /**
    +   * Wrap a `WritableByteChannel` for encryption.
    +   */
    +  def createWritableChannel(
    +      channel: WritableByteChannel,
    +      sparkConf: SparkConf,
    +      key: Array[Byte]): WritableByteChannel = {
    +    val params = new CryptoParams(key, sparkConf)
    +    val iv = createInitializationVector(params.conf)
    +    val buf = ByteBuffer.wrap(iv)
    +    while (buf.hasRemaining()) {
    --- End diff --
    
    is there any possibility this may be an infinite loop?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106779004
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskBlockManager.scala ---
    @@ -34,6 +34,8 @@ import org.apache.spark.util.{ShutdownHookManager, Utils}
      */
     private[spark] class DiskBlockManager(conf: SparkConf, deleteFilesOnStop: Boolean) extends Logging {
     
    +  private val METADATA_FILE_SUFFIX = ".meta"
    --- End diff --
    
    Assuming I am not missing something, shuffle does not use (require) block length from meta file.
    If yes, for all others, why not simply keep the block size in memory ? On executor failure, the on disk block is lost anyway, and we already maintain block info for each block in executor.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778813
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -73,55 +86,219 @@ private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) e
       }
     
       def putBytes(blockId: BlockId, bytes: ChunkedByteBuffer): Unit = {
    -    put(blockId) { fileOutputStream =>
    -      val channel = fileOutputStream.getChannel
    -      Utils.tryWithSafeFinally {
    -        bytes.writeFully(channel)
    -      } {
    -        channel.close()
    -      }
    +    put(blockId) { channel =>
    +      bytes.writeFully(channel)
         }
       }
     
    -  def getBytes(blockId: BlockId): ChunkedByteBuffer = {
    +  def getBytes(blockId: BlockId): BlockData = {
         val file = diskManager.getFile(blockId.name)
    -    val channel = new RandomAccessFile(file, "r").getChannel
    -    Utils.tryWithSafeFinally {
    -      // For small files, directly read rather than memory map
    -      if (file.length < minMemoryMapBytes) {
    -        val buf = ByteBuffer.allocate(file.length.toInt)
    -        channel.position(0)
    -        while (buf.remaining() != 0) {
    -          if (channel.read(buf) == -1) {
    -            throw new IOException("Reached EOF before filling buffer\n" +
    -              s"offset=0\nfile=${file.getAbsolutePath}\nbuf.remaining=${buf.remaining}")
    +    val blockSize = getSize(blockId)
    +
    +    securityManager.getIOEncryptionKey() match {
    +      case Some(key) =>
    +        // Encrypted blocks cannot be memory mapped; return a special object that does decryption
    +        // and provides InputStream / FileRegion implementations for reading the data.
    +        new EncryptedBlockData(file, blockSize, conf, key)
    +
    +      case _ =>
    +        val channel = new FileInputStream(file).getChannel()
    +        if (blockSize < minMemoryMapBytes) {
    +          // For small files, directly read rather than memory map.
    +          Utils.tryWithSafeFinally {
    +            val buf = ByteBuffer.allocate(blockSize.toInt)
    +            while (buf.remaining() > 0) {
    +              channel.read(buf)
    +            }
    +            buf.flip()
    +            new ByteBufferBlockData(new ChunkedByteBuffer(buf))
    +          } {
    +            channel.close()
    +          }
    +        } else {
    +          Utils.tryWithSafeFinally {
    +            new ByteBufferBlockData(
    +              new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length)))
    +          } {
    +            channel.close()
               }
             }
    -        buf.flip()
    -        new ChunkedByteBuffer(buf)
    -      } else {
    -        new ChunkedByteBuffer(channel.map(MapMode.READ_ONLY, 0, file.length))
    -      }
    -    } {
    -      channel.close()
         }
       }
     
       def remove(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
    -    if (file.exists()) {
    -      val ret = file.delete()
    -      if (!ret) {
    -        logWarning(s"Error deleting ${file.getPath()}")
    +    val meta = diskManager.getMetadataFile(blockId)
    +
    +    def delete(f: File): Boolean = {
    +      if (f.exists()) {
    +        val ret = f.delete()
    +        if (!ret) {
    +          logWarning(s"Error deleting ${file.getPath()}")
    +        }
    +
    +        ret
    +      } else {
    +        false
           }
    -      ret
    -    } else {
    -      false
         }
    +
    +    delete(file) & delete(meta)
       }
     
       def contains(blockId: BlockId): Boolean = {
         val file = diskManager.getFile(blockId.name)
         file.exists()
       }
    +
    +  private def openForWrite(file: File): WritableByteChannel = {
    +    val out = new FileOutputStream(file).getChannel()
    +    try {
    +      securityManager.getIOEncryptionKey().map { key =>
    +        CryptoStreamUtils.createWritableChannel(out, conf, key)
    +      }.getOrElse(out)
    +    } catch {
    +      case e: Exception =>
    +        out.close()
    +        throw e
    +    }
    +  }
    +
    +}
    +
    +private class EncryptedBlockData(
    +    file: File,
    +    blockSize: Long,
    +    conf: SparkConf,
    +    key: Array[Byte]) extends BlockData {
    +
    +  override def toInputStream(): InputStream = Channels.newInputStream(open())
    +
    +  override def toManagedBuffer(): ManagedBuffer = new EncryptedManagedBuffer()
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    val source = open()
    +    try {
    +      var remaining = blockSize
    +      val chunks = new ListBuffer[ByteBuffer]()
    +      while (remaining > 0) {
    +        val chunkSize = math.min(remaining, Int.MaxValue)
    +        val chunk = allocator(chunkSize.toInt)
    +        remaining -= chunkSize
    +
    +        while (chunk.remaining() > 0) {
    +          source.read(chunk)
    +        }
    +        chunk.flip()
    +        chunks += chunk
    +      }
    +
    +      new ChunkedByteBuffer(chunks.toArray)
    +    } finally {
    +      source.close()
    +    }
    +  }
    +
    +  override def size: Long = blockSize
    +
    +  override def dispose(): Unit = { }
    +
    +  private def open(): ReadableByteChannel = {
    +    val channel = new FileInputStream(file).getChannel()
    +    try {
    +      CryptoStreamUtils.createReadableChannel(channel, conf, key)
    +    } catch {
    +      case e: Exception =>
    +        Closeables.close(channel, true)
    +        throw e
    +    }
    +  }
    +
    +  private class EncryptedManagedBuffer extends ManagedBuffer {
    +
    +    override def size(): Long = blockSize
    +
    +    override def nioByteBuffer(): ByteBuffer = {
    +      // This is used by the block transfer service to replicate blocks. The upload code reads
    +      // all bytes into memory to send the block to the remote executor, so it's ok to do this
    +      // as long as the block fits in a Java array.
    +      assert(blockSize <= Int.MaxValue, "Block is too large to be wrapped in a byte buffer.")
    +      val is = toInputStream()
    +      try {
    +        ByteBuffer.wrap(ByteStreams.toByteArray(is))
    +      } finally {
    +        Closeables.close(is, true)
    +      }
    +    }
    +
    +    override def createInputStream(): InputStream = toInputStream()
    +
    +    override def convertToNetty(): Object = new ReadableChannelFileRegion(open(), blockSize)
    +
    +    override def retain(): ManagedBuffer = this
    +
    +    override def release(): ManagedBuffer = this
    +
    +  }
    +
    +}
    +
    +private class ReadableChannelFileRegion(source: ReadableByteChannel, blockSize: Long)
    +  extends AbstractReferenceCounted with FileRegion {
    +
    +  private var _transferred = 0L
    +
    +  private val buffer = ByteBuffer.allocateDirect(64 * 1024)
    +  buffer.flip()
    +
    +  override def count(): Long = blockSize
    +
    +  override def position(): Long = 0
    +
    +  override def transfered(): Long = _transferred
    +
    +  override def transferTo(target: WritableByteChannel, pos: Long): Long = {
    +    assert(pos == transfered(), "Invalid position.")
    +
    +    var written = 0L
    +    var lastWrite = -1L
    +    while (lastWrite != 0) {
    +      if (buffer.remaining() == 0) {
    +        buffer.clear()
    +        source.read(buffer)
    +        buffer.flip()
    +      }
    +      if (buffer.remaining() > 0) {
    +        lastWrite = target.write(buffer)
    +        written += lastWrite
    +      } else {
    +        lastWrite = 0
    +      }
    +    }
    +
    +    _transferred += written
    +    written
    +  }
    +
    +  override def deallocate(): Unit = source.close()
    +}
    +
    +private class CountingWritableChannel(sink: WritableByteChannel) extends WritableByteChannel {
    +
    +  private var count = 0L
    +
    +  def getCount(): Long = count
    --- End diff --
    
    `def getCount: Long` instead


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Merged build finished. Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75323/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r107787818
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -17,48 +17,67 @@
     
     package org.apache.spark.storage
     
    -import java.io.{FileOutputStream, IOException, RandomAccessFile}
    +import java.io._
     import java.nio.ByteBuffer
    +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel}
     import java.nio.channels.FileChannel.MapMode
    +import java.nio.charset.StandardCharsets.UTF_8
    +import java.util.concurrent.ConcurrentHashMap
     
    -import com.google.common.io.Closeables
    +import scala.collection.mutable.ListBuffer
     
    -import org.apache.spark.SparkConf
    +import com.google.common.io.{ByteStreams, Closeables, Files}
    +import io.netty.channel.FileRegion
    +import io.netty.util.AbstractReferenceCounted
    +
    +import org.apache.spark.{SecurityManager, SparkConf}
     import org.apache.spark.internal.Logging
    -import org.apache.spark.util.Utils
    +import org.apache.spark.network.buffer.ManagedBuffer
    +import org.apache.spark.network.util.JavaUtils
    +import org.apache.spark.security.CryptoStreamUtils
    +import org.apache.spark.util.{ByteBufferInputStream, Utils}
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
      * Stores BlockManager blocks on disk.
      */
    -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
    +private[spark] class DiskStore(
    +    conf: SparkConf,
    +    diskManager: DiskBlockManager,
    +    securityManager: SecurityManager) extends Logging {
     
       private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
    +  private val blockSizes = new ConcurrentHashMap[String, Long]()
     
    -  def getSize(blockId: BlockId): Long = {
    -    diskManager.getFile(blockId.name).length
    -  }
    +  def getSize(blockId: BlockId): Long = blockSizes.get(blockId.name)
     
       /**
        * Invokes the provided callback function to write the specific block.
        *
        * @throws IllegalStateException if the block already exists in the disk store.
        */
    -  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
    +  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
         if (contains(blockId)) {
           throw new IllegalStateException(s"Block $blockId is already present in the disk store")
         }
         logDebug(s"Attempting to put block $blockId")
         val startTime = System.currentTimeMillis
         val file = diskManager.getFile(blockId)
    -    val fileOutputStream = new FileOutputStream(file)
    +    val out = new CountingWritableChannel(openForWrite(file))
         var threwException: Boolean = true
         try {
    -      writeFunc(fileOutputStream)
    +      writeFunc(out)
    +      blockSizes.put(blockId.name, out.getCount)
           threwException = false
         } finally {
           try {
    -        Closeables.close(fileOutputStream, threwException)
    +        out.close()
    +      } catch {
    +        case ioe: IOException =>
    --- End diff --
    
    The code needs to catch any exception thrown by `out.close()` and also remove the block in that case. That wasn't done before.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106779213
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/DiskStore.scala ---
    @@ -17,48 +17,61 @@
     
     package org.apache.spark.storage
     
    -import java.io.{FileOutputStream, IOException, RandomAccessFile}
    +import java.io._
     import java.nio.ByteBuffer
    +import java.nio.channels.{Channels, ReadableByteChannel, WritableByteChannel}
     import java.nio.channels.FileChannel.MapMode
    +import java.nio.charset.StandardCharsets.UTF_8
     
    -import com.google.common.io.Closeables
    +import scala.collection.mutable.ListBuffer
     
    -import org.apache.spark.SparkConf
    +import com.google.common.io.{ByteStreams, Closeables, Files}
    +import io.netty.channel.FileRegion
    +import io.netty.util.AbstractReferenceCounted
    +
    +import org.apache.spark.{SecurityManager, SparkConf}
     import org.apache.spark.internal.Logging
    -import org.apache.spark.util.Utils
    +import org.apache.spark.network.buffer.ManagedBuffer
    +import org.apache.spark.security.CryptoStreamUtils
    +import org.apache.spark.util.{ByteBufferInputStream, Utils}
     import org.apache.spark.util.io.ChunkedByteBuffer
     
     /**
      * Stores BlockManager blocks on disk.
      */
    -private[spark] class DiskStore(conf: SparkConf, diskManager: DiskBlockManager) extends Logging {
    +private[spark] class DiskStore(
    +    conf: SparkConf,
    +    diskManager: DiskBlockManager,
    +    securityManager: SecurityManager) extends Logging {
     
       private val minMemoryMapBytes = conf.getSizeAsBytes("spark.storage.memoryMapThreshold", "2m")
     
       def getSize(blockId: BlockId): Long = {
    -    diskManager.getFile(blockId.name).length
    +    val file = diskManager.getMetadataFile(blockId)
    +    Files.toString(file, UTF_8).toLong
       }
     
       /**
        * Invokes the provided callback function to write the specific block.
        *
        * @throws IllegalStateException if the block already exists in the disk store.
        */
    -  def put(blockId: BlockId)(writeFunc: FileOutputStream => Unit): Unit = {
    +  def put(blockId: BlockId)(writeFunc: WritableByteChannel => Unit): Unit = {
         if (contains(blockId)) {
           throw new IllegalStateException(s"Block $blockId is already present in the disk store")
         }
         logDebug(s"Attempting to put block $blockId")
         val startTime = System.currentTimeMillis
         val file = diskManager.getFile(blockId)
    -    val fileOutputStream = new FileOutputStream(file)
    +    val out = new CountingWritableChannel(openForWrite(file))
         var threwException: Boolean = true
         try {
    -      writeFunc(fileOutputStream)
    +      writeFunc(out)
    +      Files.write(out.getCount().toString(), diskManager.getMetadataFile(blockId), UTF_8)
           threwException = false
         } finally {
           try {
    -        Closeables.close(fileOutputStream, threwException)
    +        Closeables.close(out, threwException)
    --- End diff --
    
    IOException can be thrown in close(), we will need to remove block (and meta) in that case as well.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    LGTM, cc @mallman to check the `unmap` part


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test FAILed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/74911/
    Test FAILed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106778005
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -56,6 +57,43 @@ private[spark] class BlockResult(
         val bytes: Long)
     
     /**
    + * Abstracts away how blocks are stored and provides different ways to read the underlying block
    + * data. The data for a BlockData instance can only be read once, since it may be backed by open
    + * file descriptors that change state as data is read.
    + */
    +private[spark] trait BlockData {
    +
    +  def toInputStream(): InputStream
    +
    +  def toManagedBuffer(): ManagedBuffer
    +
    +  def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer
    +
    +  def size: Long
    +
    +  def dispose(): Unit
    +
    +}
    +
    +private[spark] class ByteBufferBlockData(
    +    val buffer: ChunkedByteBuffer,
    +    autoDispose: Boolean = true) extends BlockData {
    +
    +  override def toInputStream(): InputStream = buffer.toInputStream(dispose = autoDispose)
    +
    +  override def toManagedBuffer(): ManagedBuffer = new NettyManagedBuffer(buffer.toNetty)
    +
    +  override def toByteBuffer(allocator: Int => ByteBuffer): ChunkedByteBuffer = {
    +    buffer.copy(allocator)
    +  }
    --- End diff --
    
    autoDispose is not honored for toManagedBuffer and toByteBuffer ?
    On first pass, it looks like it is not ...
    
    Also, is the expectation that invoker must manually invoke dispose when not using toInputStream ?
    Would be good to add a comment about this to BlockData trait detailing the expectation.
    



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    **[Test build #75120 has started](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/75120/testReport)** for PR 17295 at commit [`00b6d00`](https://github.com/apache/spark/commit/00b6d00b35fef1df4492288919ae54302afae8cb).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request #17295: [SPARK-19556][core] Do not encrypt block manager ...

Posted by vanzin <gi...@git.apache.org>.
Github user vanzin commented on a diff in the pull request:

    https://github.com/apache/spark/pull/17295#discussion_r106751157
  
    --- Diff: core/src/main/scala/org/apache/spark/security/CryptoStreamUtils.scala ---
    @@ -63,12 +83,40 @@ private[spark] object CryptoStreamUtils extends Logging {
           is: InputStream,
           sparkConf: SparkConf,
           key: Array[Byte]): InputStream = {
    -    val properties = toCryptoConf(sparkConf)
         val iv = new Array[Byte](IV_LENGTH_IN_BYTES)
    -    is.read(iv, 0, iv.length)
    -    val transformationStr = sparkConf.get(IO_CRYPTO_CIPHER_TRANSFORMATION)
    -    new CryptoInputStream(transformationStr, properties, is,
    -      new SecretKeySpec(key, "AES"), new IvParameterSpec(iv))
    +    var read = 0
    +    while (read < iv.length) {
    --- End diff --
    
    Ah, missed that one. +1 for shorter code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by cloud-fan <gi...@git.apache.org>.
Github user cloud-fan commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    > The penalty comes when transferring that encrypted data from disk. If the
    data ends up in memory again, it is as efficient as before; but if the
    evicted block needs to be transferred directly to a remote executor, then
    there's now a performance penalty, since the code now uses a custom
    FileRegion implementation to decrypt the data before transferring.
    
    What's the actual difference? previously we transfer encrypted data?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark issue #17295: [SPARK-19556][core] Do not encrypt block manager data in...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the issue:

    https://github.com/apache/spark/pull/17295
  
    Test PASSed.
    Refer to this link for build results (access rights to CI server needed): 
    https://amplab.cs.berkeley.edu/jenkins//job/SparkPullRequestBuilder/75120/
    Test PASSed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org