You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by rxin <gi...@git.apache.org> on 2014/04/12 10:11:19 UTC

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

GitHub user rxin opened a pull request:

    https://github.com/apache/spark/pull/397

    Added a FastByteArrayOutputStream that exposes the underlying array to avoid unnecessary mem copy.

    This should fix the extra memory copy introduced by #266.
    
    @mridulm @pwendell @mateiz

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/rxin/spark FastByteArrayOutputStream

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/397.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #397
    
----
commit 2fb9935e6c59ead89cdd425512ccf6e95872b3d8
Author: Reynold Xin <rx...@apache.org>
Date:   2014-04-12T08:09:42Z

    Added a FastByteArrayOutputStream that exposes the underlying array to avoid unnecessary mem copy.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40299000
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14085/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40296425
  
    @rxin hm looks like this RAT exclude isn't working. Can take another crack at it later tonight.
    
    https://github.com/apache/spark/blob/master/.rat-excludes#L43


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40298999
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40295421
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40320613
  
    Build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40279192
  
    You could deprecate and override `toByteArray` to throw an exception, etc., to be extra-safe. They "work", the result just may not have much meaning independently. Your class still has methods like `close()` either way. Dunno, still seems simpler than the duplication.
    
    What's the compaction for? If you've got a series of ~2GB containers, I'd assume you'd fill them each pretty completely and transparently split a big write across the existing and next buffer. It saves a huge allocation, which could fail.
    
    (In the grow() method, you would have to check that the new doubled size hasn't overflowed!)
    
    I agree with use of `ByteBuffer`, but suppose I'm pointing out that it has to get used in several other places in the code that use `byte[]` right now in order to get the benefit. I understand that wasn't the direct purpose of the code you're working on, but is the purpose of this PR I think. In which case, perhaps better to leverage your direction.
    
    A simpler step in your direction could be the basis for the change that this PR is trying for. That's why I wonder if this piece could have a simpler, stand-alone purpose.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40274836
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on a diff in the pull request:

    https://github.com/apache/spark/pull/397#discussion_r11559113
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala ---
    @@ -0,0 +1,104 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.io
    +
    +import java.io.OutputStream
    +
    +/**
    + * A simple, fast byte-array output stream that exposes the backing array,
    + * inspired by fastutil's FastByteArrayOutputStream.
    + *
    + * [[java.io.ByteArrayOutputStream]] is nice, but to get its content you
    + * must generate each time a new object. This doesn't happen here.
    + *
    + * This class will automatically enlarge the backing array, doubling its
    + * size whenever new space is needed.
    + */
    +private[spark] class FastByteArrayOutputStream(initialCapacity: Int = 16) extends OutputStream {
    +
    +  private[this] var _array = new Array[Byte](initialCapacity)
    +
    +  /** The current writing position. */
    +  private[this] var _position: Int = 0
    +
    +  /** The array backing the output stream. */
    +  def array: Array[Byte] = _array
    +
    +  /** The number of valid bytes in array. */
    +  def length: Int = _position
    +
    +  override def write(b: Int): Unit = {
    +    if (_position >= _array.length ) {
    +      _array = FastByteArrayOutputStream.growArray(_array, _position + 1, _position)
    +    }
    +    _array(_position) = b.toByte
    +    _position += 1
    +  }
    +
    +  override def write(b: Array[Byte], off: Int, len: Int) {
    +    if (off < 0) {
    +      throw new ArrayIndexOutOfBoundsException(s"Offset ($off) is negative" )
    +    }
    +    if (len < 0) {
    +      throw new IllegalArgumentException(s"Length ($len) is negative" )
    +    }
    +    if (off + len > b.length) {
    +      throw new ArrayIndexOutOfBoundsException(
    +        s"Last index (${off+len}) is greater than array length (${b.length})")
    +    }
    +    if ( _position + len > _array.length ) {
    +      _array = FastByteArrayOutputStream.growArray(_array, _position + len, _position)
    +    }
    +    System.arraycopy(b, off, _array, _position, len)
    +    _position += len
    +  }
    +
    +  /** Ensures that the length of the backing array is equal to [[length]]. */
    +  def trim(): this.type = {
    +    if (_position < _array.length) {
    +      val newArr = new Array[Byte](_position)
    --- End diff --
    
    This still entails a copy, right? I don't see that this improves the situation by itself.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40320487
  
    Yes I knew about biased locking. That's why I said "how well the jvm can do away with the lock that is always running in a single threaded mode". However, it is hard to test how well it works with all the random cases (large streams, small streams, and even with biased locking it is not free) we have for this and quantify the impact. And given how small and simple this code is, I think it is fine as is.
    
    I removed the trim method.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40298525
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40276213
  
    I actually meant something like this:
    (This is from an internal WIP branch to tackle the ByteBuffer to Seq[ByteBuffer])
    Ideally I should submit this via a PR, but unfortunately, but ...
    
    ```
    package org.apache.spark.io
    
    import java.io.OutputStream
    import java.nio.ByteBuffer
    import java.util.{Arrays => JArrays}
    import org.apache.spark.storage.DiskStore
    
    /**
     * A custom implementation of ByteArrayOutputStream which tries to minimize array copies by
     * reusing the underlying array if within bounds.
     *
     * Note, this is unsafe for general use : directly exposed the data array for use.
     *
     */
    private[io] class SparkByteArrayOutputStream(initialSize: Int) extends OutputStream {
    
      if (initialSize < 0) {
        throw new IllegalArgumentException("Negative initial size: " + initialSize)
      }
    
      /**
       * The buffer where data is stored.
       */
      private var buf: Array[Byte] = new Array[Byte](initialSize)
      /**
       * The number of valid bytes in the buffer.
       */
      private var count: Int = 0
    
      /**
       * Creates a new byte array output stream. The buffer capacity is
       * initially 32 bytes, though its size increases if necessary.
       */
      def this() = this(32)
    
      private def ensureCapacity(minCapacity: Int) {
        if (minCapacity < 0) throw new IllegalArgumentException("require capacity " + minCapacity + " is negative")
        if (minCapacity > buf.length) grow(minCapacity)
      }
    
      /**
       * Increases the capacity to ensure that it can hold at least the
       * number of elements specified by the minimum capacity argument.
       *
       * @param minCapacity the desired minimum capacity
       */
      private def grow(minCapacity: Int) {
        val oldCapacity: Int = buf.length
    
        // TODO: This might be too expensive as size grows : work out something better ?
        var newCapacity: Int = oldCapacity << 1
    
        if (newCapacity - minCapacity < 0) newCapacity = minCapacity
        // set max size to DiskStore.MAX_BLOCK_SIZE
        if (newCapacity > DiskStore.MAX_BLOCK_SIZE) newCapacity = DiskStore.MAX_BLOCK_SIZE
    
        if (newCapacity < 0) {
          throw new IllegalArgumentException("computed cacacity = " + newCapacity +
            " is negative. minCapacity = " + minCapacity)
        }
    
        if (newCapacity <= buf.length || newCapacity < minCapacity) {
          throw new IllegalStateException("Cant grow the array anymore. Already at max size ?" +
            " newCapacity = " + newCapacity +
            ", minCapacity = " + minCapacity +
            ", blocksize = " + DiskStore.MAX_BLOCK_SIZE)
        }
    
        buf = JArrays.copyOf(buf, newCapacity)
      }
    
      /**
       * Writes the specified byte to this byte array output stream.
       *
       * @param   b   the byte to be written.
       */
      def write(b: Int) {
        ensureCapacity(count + 1)
        buf(count) = b.asInstanceOf[Byte]
        count += 1
      }
    
      /**
       * Writes <code>len</code> bytes from the specified byte array
       * starting at offset <code>off</code> to this byte array output stream.
       *
       * @param   b     the data.
       * @param   off   the start offset in the data.
       * @param   len   the number of bytes to write.
       */
      override def write(b: Array[Byte], off: Int, len: Int) {
        if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
          throw new IndexOutOfBoundsException
        }
        ensureCapacity(count + len)
        System.arraycopy(b, off, buf, count, len)
        count += len
      }
    
      /**
       * Resets the <code>count</code> field of this byte array output
       * stream to zero, so that all currently accumulated output in the
       * output stream is discarded. The output stream can be used again,
       * reusing the already allocated buffer space.
       *
       * @see     java.io.ByteArrayInputStream#count
       */
      def reset() {
        count = 0
      }
    
      /**
       * Trim the underlying array : do this only if it makes sense - that is, if the space saving
       * is worth more than the cost of doing the allocation copy.
       *
       * Note, this is called after all data has been written to the stream to compact the array.
       */
      def compact() {
        if (SparkByteArrayOutputStream.needCompact(this)) {
          buf = JArrays.copyOf(buf, count)
        }
      }
    
      /**
       * Creates a newly allocated byte array. Its size is the current
       * size of this output stream and the valid contents of the buffer
       * have been copied into it.
       *
       * @return  the current contents of this output stream, as a byte array.
       * @see     java.io.ByteArrayOutputStream#size()
       */
      def toByteBuffer: ByteBuffer = {
        if (0 == count) return ByteBuffer.allocate(0)
        ByteBuffer.wrap(buf, 0, count)
      }
    
      /**
       * Returns the current size of the buffer.
       *
       * @return  the value of the <code>count</code> field, which is the number
       *          of valid bytes in this output stream.
       * @see     java.io.ByteArrayOutputStream#count
       */
      def size: Int = {
        count
      }
    
      /**
       * Closing a <tt>ByteArrayOutputStream</tt> has no effect. The methods in
       * this class can be called after the stream has been closed without
       * generating an <tt>IOException</tt>.
       * <p>
       *
       */
      override def close() {
        // set a flag and not allow any more writes ?
      }
    }
    
    
    object SparkByteArrayOutputStream {
      private val MINIMUM_BUFFER_SIZE = System.getProperty("spark.io.baos.trim.min_buffer_size", (1024 * 1024).toString).toInt
      // 0.1 % by default.
      private val WASTAGE_FRACTION = System.getProperty("spark.io.baos.trim.wastage", 0.001.toString).toDouble
    
      def needCompact(stream: SparkByteArrayOutputStream): Boolean = {
        val capacity = stream.buf.length
        val used = stream.size
        val wastage = capacity - used
    
        // If no wastage, nothing to compact
        if (wastage <= 0) return false
    
        // If capacity really low, always allow compaction : since cost of compaction will be low.
        if (capacity < MINIMUM_BUFFER_SIZE) return true
    
        // If wastage is small enough, then dont compact
        // Currently, set to X % of capacity.
        val allowedWastage = capacity * WASTAGE_FRACTION
    
        wastage >= allowedWastage
      }
    }
    ```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40298521
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40316654
  
    Eh the other reason fastutil implements the FastByteArrayOutputStream without subclassing ByteArrayOutputStream was to get rid of the synchronized writes. To do this properly, we should probably benchmark every corner case to see how well the jvm can do away with the lock that is always running in a single threaded mode. However, we are not adding much code anyway so I think this is good as is. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/397#discussion_r11561015
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockManager.scala ---
    @@ -1001,9 +1003,9 @@ private[spark] class BlockManager(
           blockId: BlockId,
           values: Iterator[Any],
           serializer: Serializer = defaultSerializer): ByteBuffer = {
    -    val byteStream = new ByteArrayOutputStream(4096)
    +    val byteStream = new FastByteArrayOutputStream(4096)
         dataSerializeStream(blockId, byteStream, values, serializer)
    -    ByteBuffer.wrap(byteStream.toByteArray)
    +    ByteBuffer.wrap(byteStream.array)
    --- End diff --
    
    should probably use ByteBuffer.wrap(bytes, offset, length) here too, to avoid wrapping the extra bytes?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40304662
  
    Yeah this looks a lot like what I personally have in mind. I think you could simply subclass `java.io.ByteArrayOutputStream` and add the two new methods, to avoid writing new code.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40321667
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14101/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/397#discussion_r11567459
  
    --- Diff: core/src/main/scala/org/apache/spark/util/io/FastByteArrayOutputStream.scala ---
    @@ -0,0 +1,113 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.util.io
    +
    +import java.io.OutputStream
    +import java.nio.ByteBuffer
    +
    +/**
    + * A simple, fast byte-array output stream that exposes the backing array,
    + * inspired by fastutil's FastByteArrayOutputStream.
    + *
    + * [[java.io.ByteArrayOutputStream]] is nice, but to get its content you
    + * must generate each time a new object. This doesn't happen here.
    + *
    + * This class will automatically enlarge the backing array, doubling its
    + * size whenever new space is needed.
    + */
    +private[spark] class FastByteArrayOutputStream(initialCapacity: Int = 16) extends OutputStream {
    +
    +  private[this] var _array = new Array[Byte](initialCapacity)
    +
    +  /** The current writing position. */
    +  private[this] var _position: Int = 0
    +
    +  /** The number of valid bytes in array. */
    +  def length: Int = _position
    +
    +  override def write(b: Int): Unit = {
    +    if (_position >= _array.length ) {
    +      _array = FastByteArrayOutputStream.growArray(_array, _position + 1, _position)
    +    }
    +    _array(_position) = b.toByte
    +    _position += 1
    +  }
    +
    +  override def write(b: Array[Byte], off: Int, len: Int) {
    +    if (off < 0) {
    +      throw new ArrayIndexOutOfBoundsException(s"Offset ($off) is negative" )
    +    }
    +    if (len < 0) {
    +      throw new IllegalArgumentException(s"Length ($len) is negative" )
    +    }
    +    if (off + len > b.length) {
    +      throw new ArrayIndexOutOfBoundsException(
    +        s"Last index (${off + len}) is greater than array length (${b.length})")
    +    }
    +    if ( _position + len > _array.length ) {
    +      _array = FastByteArrayOutputStream.growArray(_array, _position + len, _position)
    +    }
    +    System.arraycopy(b, off, _array, _position, len)
    +    _position += len
    +  }
    +
    +  /** Return a ByteBuffer wrapping around the filled content of the underlying array. */
    +  def toByteBuffer: ByteBuffer = {
    +    ByteBuffer.wrap(_array, 0, _position)
    +  }
    +
    +  /**
    +   * Return a tuple, where the first element is the underlying array, and the second element
    +   * is the length of the filled content.
    +   */
    +  def toArray: (Array[Byte], Int) = (_array, _position)
    +
    +  /** Ensures that the length of the backing array is equal to [[length]]. */
    +  def trim(): this.type = {
    --- End diff --
    
    We probably don't need this method, as it eliminates the whole purpose of this stream. You might as well use ByteArrayOutputStream if you need trim().


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40295467
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40295319
  
    Ok pushed a new version that avoids the extra trim. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40317255
  
    Yes, even if the lock is not removed (and it should be) its overhead is trivial compared to other operations here. Up to you.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40295427
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40285054
  
    Sure, I myself was not suggesting that we should make them throw exceptions. If one really wanted to prohibit their use, that would be a way to do so even when subclassing, but I don't suggest they must be prohibited.
    
    To me, the methods aren't broken or anything, and the resulting class can be reused for the purpose @rxin has in mind in this PR, if these methods are available.
    
    Agree that the max size of such a buffer could be configurable. However the limit can't be more than `Integer.MAX_VALUE` just because this is the largest an array can be . I was just pointing this out in regard to `var newCapacity: Int = oldCapacity << 1` which will fail if `oldCapacity` is more than half of `Integer.MAX_VALUE` already. 
    
    (There's another tangent here about whether the buffer class should even enforce a limit -- what does it do, fail with `IOException`? -- because the caller has to manage it either way.)
    
    The gist of my strawman suggestion is: what if we had a simple subclass of `ByteArrayOutputStream` that exposes a `ByteBuffer`? I argue that is a basis for removing some long-standing array copies in various parts of the code, which is @rxin's purpose. And then I think it suits your purpose too, excepting the compaction logic, but I was wondering about whether it is needed. (Maybe I should take this to the JIRA issue you opened about your work?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40320307
  
    Apparently the JVM implements "biased locking" (http://www.oracle.com/technetwork/java/6-performance-137236.html#2.1.1) when an object's monitor is uncontended, which eliminates all synchronization overhead (at the cost of an extremely expensive "bias revocation" operation if any contention appears). So we would expect there to be no relevant performance loss of this synchronization in this case, except perhaps for very small streams which are completed before being biased.
    
    However, the API of `def toByteArray: Array[Byte]` and `def toArray: (Array[Byte], Int)` with different performance characteristics is pretty strange. I think the current solution is probably fine, as the implementation is pretty straightforward. I'll eat those words when the first JIRA comes back with a bug introduced from this patch, though.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40278958
  
    Your summarization is fairly accurate @srowen. To add, my initial approach was to subclass to minimize code :-)
    The reason why I moved away from it was because I did not want to expose the toByteArray method : to prevent current/future accidental invocation of that (expensive) method.
    Not to mention, a bunch of other methods which actually wont work (more below) in our context.
    
    Before going into details, please note that the main purpose of this stream is to actually get a ByteBuffer out of the data which is subsequently used.
    
    And what is not mentioned above is that the actual use of this class is within another class which multiplexes over these baos instances - so that we are not limited to 2Gb limit : we have an Sequence of these streams : which will be read in order to get to the actual data (the wrapper OutputStream moves from first to next as required; and returns a Seq[ByteBuffer] when we are done writing to it).
    Which is why most of the methods wont work - reset, close, toByteArray, toString : since the output stream is not starting at a data boundary : but inside the context of a larger stream. We could leave the methods around : but it did not look right to leave potentially broken functionality around.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by rxin <gi...@git.apache.org>.
Github user rxin closed the pull request at:

    https://github.com/apache/spark/pull/397


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by mridulm <gi...@git.apache.org>.
Github user mridulm commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40284529
  
    There are two issues here:
    a) If we are going to override and deprecate/throw exception for every method which is not exposed by OutputStream - while overriding functionality for most others in ByteArrayOutputStream : then I dont see the value in extending BOAS.
    
    b) More importantly, we do not get to control how array growth happens, what the thresholds are, etc. For example, relying on Integer.MAX_VALUE always is not the best option - we would need to have that configurable, to control what the maximum block size in spark can be per ByteBuffer : this has perf implications in terms of mmap'ing files, reading/writing to other ByteBuffers/sockets, etc.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40289835
  
    @aarondav I personally like your second method. That alone is probably just what is needed. Callers who actually want a `ByteBuffer` can wrap easily with this info. In practice the offset will always (?) be 0 so one of the Ints could be omitted I bet.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40321666
  
    Build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40274870
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14073/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40321999
  
    Looking at our usages of ByteBuffer#array in Spark, most of them do not handle ByteBuffers where capacity != limit correctly. This change could turn these oversights into actual bugs, if the ByteBuffers returned here are used later as inputs. See TachyonStore for an example, where all the logging and accounting looks at limit(), but the actual bytes used will be capacity().
    
    This change also has the behavior of actually storing up to 2x the amount of bytes as actual data, since before we could throw away the backing array and just keep the data. After all, the original solution using fastutil just called trim(), which means it wasn't really faster than ByteArrayOutputStream, but it did preserve the safety and memory characteristics that this implementation does not have.
    
    This change makes the most sense for RawTextSender, where we immediately turn around and write the bytes to a stream, so we don't need to store them and hope no one misuses it later. It makes less sense when we actually want ByteBuffers later -- I think a copy is really the only reasonable solution there. 
    
    The ideal solution in terms of API might actually just be @srowen's suggestion:
    ```
    class FastByteArrayOutputStream extends ByteArrayOutputStream {
       def getUnderlyingArray: Array[Byte]
    }
    ```
    
    The only downside is that this still has the synchronized behavior which may or may not be impactful, but it saves 100 lines of code and would allow RawTextSender to go without a copy. As fasr as I can tell, neither Task nor Serializer really benefited from the fastutil FastByteArrayOutputStream as both copied the data anyway. 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by rxin <gi...@git.apache.org>.
Github user rxin commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40668466
  
    Alright you guys convinced me. Let's close this one for now. When we clean up the downstream consumers of the byte buffers, we can revisit this (and maybe do crazier things like reusing byte arrays instead of constantly re-allocating new ones).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40320605
  
     Build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40274840
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40288679
  
    Having a toByteBuffer method definitely seems reasonable to me, the only issue is that ByteBuffer does not provide a good stream-compatible API. So it would either still have to use write(bytes, offset, length), by getting these parameters from the ByteBuffer (and being careful that .array() returns the entire underlying array), or by using some API external to both ByteBuffer and our fast output stream.
    
    What if FastBufferedOutputStream has two methods:
    ```
    def toByteBuffer: ByteBuffer
    def toByteArraySegment: (Array[Byte], Int, Int)
    ```
    where the latter returns the offset and index to be passed in to any stream APIs.
    
    In order to provide a lower level API, we often have to sacrifice some code niceties, but this at least provides pretty good safety for the user to not misuse it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40274869
  
    Merged build finished. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40298517
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40277866
  
    So I think I agree with the overall direction here, but want to make a few comments to clarify why. Apologies if I'm stating the obvious.
    
    The management of the byte array here is not what's solving the immediate problem. Where this stuff is used in the code, the calling code wants to write N bytes in pieces, and then get a `byte[]` of exactly N bytes. Except in the lucky case that N exactly matched the buffer size, this entails an allocation and copy. Previously this was hidden in a `trim()` method. The `compact()` method would not work directly since it can result in an underlying array of more than N bytes.
    
    What's needed is an abstraction that can take a `byte[]` that's possibly too large, and a limit N, and broker access in a way that makes it act like it is only of length N, without a copy.
    
    Although this is not `ByteBuffer`'s primarily role in life, it can kind of play that role. You can wrap bytes 0 to N of a `byte[]` without a copy. And it has methods to read through the elements of the buffer. Note that its `array()` method itself does not copy, but, also provides access to the whole maybe-too-large array underneath.
    
    For that reason, and because calling code in one case already wants a `ByteBuffer`, this feels like a good solution. However, other callers need to change to use `ByteBuffer` if they care about the allocation. So there's more to this than just a drop-in replacement.
    
    However I'd also say that this doesn't need reimplementing `ByteArrayOutputStream` so entirely. Just subclass it and expose a `toByteBuffer()` method that `wrap()`s the internal `byte[]` with an appropriate limit.
    
    I understand the idea of this code is to implement a compaction mechanism, but that's a separate issue really. (When does this help? I understand it can free up some heap, at the cost of a new second allocation and copy, and could be helpful if this object were sticking around a long time. But it's not, right?)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on a diff in the pull request:

    https://github.com/apache/spark/pull/397#discussion_r11561006
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala ---
    @@ -43,15 +45,15 @@ object RawTextSender extends Logging {
     
         // Repeat the input data multiple times to fill in a buffer
         val lines = Source.fromFile(file).getLines().toArray
    -    val bufferStream = new ByteArrayOutputStream(blockSize + 1000)
    +    val bufferStream = new FastByteArrayOutputStream(blockSize + 1000)
         val ser = new KryoSerializer(new SparkConf()).newInstance()
         val serStream = ser.serializeStream(bufferStream)
         var i = 0
    -    while (bufferStream.size < blockSize) {
    +    while (bufferStream.length < blockSize) {
           serStream.writeObject(lines(i))
           i = (i + 1) % lines.length
         }
    -    val array = bufferStream.toByteArray
    +    val array = bufferStream.trim().array
    --- End diff --
    
    can avoid using trim (/an array copy) here by using RateLimitedOutputStream#write(bytes, offset, length)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: Added a FastByteArrayOutputStream that exposes...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/397#issuecomment-40295468
  
    
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14082/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---