You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@drill.apache.org by vrozov <gi...@git.apache.org> on 2017/11/29 23:25:47 UTC

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

GitHub user vrozov opened a pull request:

    https://github.com/apache/drill/pull/1058

    DRILL-6002: Avoid memory copy from direct buffer to heap while spilling to local disk

    @paul-rogers Please review

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/vrozov/drill DRILL-6002

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/drill/pull/1058.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1058
    
----
commit 8e9124de681d3a8cd70bf0bb243460cb78dcb295
Author: Vlad Rozov <vr...@apache.org>
Date:   2017-11-22T22:06:13Z

    DRILL-6002: Avoid memory copy from direct buffer to heap while spilling to local disk

----


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154478289
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java ---
    @@ -249,7 +247,7 @@ public void close() throws IOException {
             ex = e;
           }
           try {
    -        closeOutputStream();
    +        closeWriter();
    --- End diff --
    
    Just as a matter of record, this abstraction is rather awkward. It tries to be three things: 1) a writer for a spill file, 2) a reader for a spill file and 3) the tombstone for the spill file.
    
    It would be much better to have separate reader and writer classes that come and go, with this class just representing the spill file over its lifetime. There was, however, no easy way to make that change and preserve existing code. Since I'd already changed more code than I really wanted, I left well enough alone. (See the "unmanaged" version for the original code, which was even more murky.)
    
    Now that you've modified the Writer to be the batch writer, I wonder if we should revisit the issue rather than preserving the old, muddy, semantics of this class.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154499915
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -40,20 +52,18 @@
        */
     
       public static class Writer {
    +    static final MetricRegistry metrics = DrillMetrics.getRegistry();
    +    static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
     
    -    private final OutputStream stream;
    -    private final BufferAllocator allocator;
    -    private boolean retain;
    +    private final SpillSet spillSet;
    +    private final WritableByteChannel channel;
    +    private final OutputStream output;
         private long timeNs;
     
    -    public Writer(BufferAllocator allocator, OutputStream stream) {
    -      this.allocator = allocator;
    -      this.stream = stream;
    -    }
    -
    -    public Writer retain() {
    -      retain = true;
    -      return this;
    +    private Writer(SpillSet spillSet, String path) throws IOException {
    --- End diff --
    
    The `Writer` in `VectorSerializer` is used only for spilling.  I'd suggest moving `VectorSerializer` to the `spill`package to make it more explicit and keep the dependency on the `SpillSet`. I am not sure if there is a need for generic `Writer` that can handle multiple use cases. I think that `Writer` needs to be optimized to handle spill use case.
    
    `SpillSet` is also needed to get `WritableByteChannel` and encapsulates what `Writer` uses from operators.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154475785
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    --- End diff --
    
    In the master branch, the `VectorSerializer` classes were slapped together as a simpler API on top of the existing `VectorAccessibleSerializable` classes. (That's why the code is so clunky.)
    
    Here, we turn the `VectorSerializer` into the means to do the writing, which seems like a good improvement.
    
    Does this mean we now have two implementations? The one here and the old one in `VectorAccessibleSerializable`? Do we need to clean up that split somehow? Else, should we leave comments to point people between the two copies so they know to make changes in both places?
    
    Looking at the other changes, it seems we leave the old "unmanaged" external sort to use the old implementation.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154478770
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    +        }
    +
    +        timeNs += timerContext.stop();
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      } finally {
    +        if (!retain) {
    --- End diff --
    
    Frankly, the caller should decide what to do with the SV2 and the batch. Much simpler to just do:
    
    ```
    writer.write(batch, sv2);
    batch.clear();
    sv2.clear();
    ```
    
    Or, provide a `writeAndClear()` method if we think it is worth pulling those two lines into the write method.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r158416355
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    --- End diff --
    
    I believe we're good as `buf.nioBuffer()` correctly sets ByteBuffer offset and length using `readerIndex()` and `readableBytes()`.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r158180527
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    --- End diff --
    
    I believe Drill uses the `writerIndex` of `AbstractByteBuf` to indicate the end of the written portion. So, we want to write that number of bytes. If that's what this code does, we're good.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r158180993
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -40,20 +52,18 @@
        */
     
       public static class Writer {
    +    static final MetricRegistry metrics = DrillMetrics.getRegistry();
    +    static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
     
    -    private final OutputStream stream;
    -    private final BufferAllocator allocator;
    -    private boolean retain;
    +    private final SpillSet spillSet;
    +    private final WritableByteChannel channel;
    +    private final OutputStream output;
         private long timeNs;
     
    -    public Writer(BufferAllocator allocator, OutputStream stream) {
    -      this.allocator = allocator;
    -      this.stream = stream;
    -    }
    -
    -    public Writer retain() {
    -      retain = true;
    -      return this;
    +    private Writer(SpillSet spillSet, String path) throws IOException {
    --- End diff --
    
    The `Writer` is a wrapper to make `VectorSerializer` easier to use. It is *presently* only used in spilling (that's the project that created it.) But, the intention is for it to be general purpose. By binding int to `SpillSet`, it then becomes the easy-to-use interface for spilling, but other code must use the clunkier prior interface.
    
    Not a big issue; just trying to pass along the design intent when the code was created...


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154477089
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    +        }
    +
    +        timeNs += timerContext.stop();
    +      } catch (IOException e) {
    +        throw new RuntimeException(e);
    +      } finally {
    +        if (!retain) {
    +          batch.clear();
    +          if (sv2 != null) {
    +            sv2.clear();
    +          }
    +        }
           }
    -      timeNs += vas.getTimeNs();
           return this;
         }
     
    +    public long close() throws IOException {
    +      if (!channel.isOpen()) {
    +        return 0;
    +      }
    +      long writeSize = spillSet.getPosition(channel);
    +      spillSet.tallyWriteBytes(writeSize);
    --- End diff --
    
    This seems awkward. Before, this class was a utility that wrapped an existing write mechanism. Here, this class basically becomes the writer itself -- a fancy file writer that manages its own channel. That seems like a good evolution.
    
    But, it seems that updating spill set metrics is not needed here. Instead, someone will call this close method. That method can first update the spill set metrics based on the total write bytes for this stream. That is, this writer can do the writing, manage the channel, and accumulate its own write bytes. The caller decides what to do with that info (add it to the spill set's metrics.)


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154477815
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java ---
    @@ -419,6 +462,13 @@ public SpillSet(DrillConfig config, FragmentHandle handle, PhysicalOperator popC
             operName, handle.getMajorFragmentId(), popConfig.getOperatorId(), handle.getMinorFragmentId());
       }
     
    +  @VisibleForTesting
    +  public SpillSet(String baseDir) {
    --- End diff --
    
    We're trying to use the Hadoop `Path` class as our preferred way to pass file and directory names around.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154478390
  
    --- Diff: exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java ---
    @@ -117,11 +117,12 @@ public void testNullableType(MinorType type) throws IOException {
       private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException {
     
         File dir = OperatorFixture.getTempDir("serial");
    -    File outFile = new File(dir, "serialze.dat");
    -    try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) {
    -      VectorSerializer.writer(fixture.allocator(), out)
    -        .write(rowSet.container(), rowSet.getSv2());
    -    }
    +    VectorSerializer.writer(new SpillSet(dir.getAbsolutePath()),  "serialize.dat")
    --- End diff --
    
    The `File dir` line should change to use Tim's new directory watcher feature. 


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154476694
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    --- End diff --
    
    Retain is never actually used. That's why, in the earlier version, it was a flag that was not passed into the write method. Further, whether retain is not likely to be decided batch-by-batch, rather it is a general policy set at the writer level. So, if we don't like the `retain()` method, perhaps pass this option into the constructor rather than into each `write()` call.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154477573
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java ---
    @@ -331,6 +337,43 @@ public long getReadBytes(InputStream inputStream) {
         }
       }
     
    +  private static class WritableByteChannelImpl implements WritableByteChannel
    +  {
    +    private static final int TRANSFER_SIZE = 32 * 1024;
    +
    +    private OutputStream out;
    +    private final byte buffer[] = new byte[TRANSFER_SIZE];
    --- End diff --
    
    Unfortunately, this is one of the problems that the current design was intended to solve. As we can see in the hash agg code; multiple of these objects are active in memory at any one time. But, no more than one is ever writing. By putting the buffer here, we create one per (potentially thousands) of retained channels. But, by putting the buffer on the allocator, we hold one buffer per operator.
    
    Being the guy who wrote the current allocator code, I realize it is clunky. But, it is clunky because we favored memory frugality over elegance. (If there is a cleaner way to have one buffer per thread, I'm all for it!)
    
    Isn't the purpose of this change to avoid memory buffers? Seems, for the above reasons, we're moving backwards...


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154477971
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java ---
    @@ -280,17 +278,14 @@ private void closeInputStream() throws IOException {
           logger.trace("Summary: Read {} bytes from {}", readLength, path);
         }
     
    -    public long closeOutputStream() throws IOException {
    -      if (outputStream == null) {
    -        return 0;
    +    public void closeWriter() throws IOException {
    +      if (writer == null) {
    +        return;
           }
    -      long writeSize = spillSet.getPosition(outputStream);
    -      spillSet.tallyWriteBytes(writeSize);
    -      outputStream.close();
    -      outputStream = null;
    +      long writeSize = writer.close();
    +      long timeNs = writer.timeNs();
    --- End diff --
    
    See comment above. Updating spill set here decouples the writer from the spill set. The `BatchGroup` already knows about the spill set.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154499154
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    --- End diff --
    
    I agree that we should only write necessary payload and avoid spilling unused buffers. Note that `channel.write()` writes only bytes written to a `ByteBuffer` (`ByteBuffer.remaining()`) and not the whole allocated buffer. Do you mean that there are bytes written to a buffer but they should not be spilled? In this case, I'd suggest limiting the scope of this PR to using `WritableByteChannel` to avoid memory copy from off-heap during spill to local files and handling extra bytes in a separate JIRA/PR.


---

[GitHub] drill issue #1058: DRILL-6002: Avoid memory copy from direct buffer to heap ...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on the issue:

    https://github.com/apache/drill/pull/1058
  
    @paul-rogers Please review


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154476440
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -40,20 +52,18 @@
        */
     
       public static class Writer {
    +    static final MetricRegistry metrics = DrillMetrics.getRegistry();
    +    static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
     
    -    private final OutputStream stream;
    -    private final BufferAllocator allocator;
    -    private boolean retain;
    +    private final SpillSet spillSet;
    +    private final WritableByteChannel channel;
    +    private final OutputStream output;
         private long timeNs;
     
    -    public Writer(BufferAllocator allocator, OutputStream stream) {
    -      this.allocator = allocator;
    -      this.stream = stream;
    -    }
    -
    -    public Writer retain() {
    -      retain = true;
    -      return this;
    +    private Writer(SpillSet spillSet, String path) throws IOException {
    --- End diff --
    
    Before this class was independent of the `SpillSet`. Now, it can be used only with that class. Not sure if this is an improvement.
    
    The only thing it seems we need from `SpillSet` is to update the write byte count. Perhaps define an interface here with just the needed methods. Then, let `SpillSet` implement that interface. That gets what you need without introducing extra dependencies.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r160841030
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java ---
    @@ -107,7 +107,7 @@
          * nodes provide insufficient local disk space)
          */
     
    -    private static final int TRANSFER_SIZE = 32 * 1024;
    +    private static final int TRANSFER_SIZE = 1024 * 1024;
    --- End diff --
    
    Is a 1MB buffer excessive? The point of a buffer is to ensure we write in units of a disk block. For the local file system, experience showed no gain after 32K. In the MapR FS, each write is in units of 1 MB. Does Hadoop have a preferred size?
    
    Given this variation, if we need large buffers, should we choose a buffer size based on the underlying file system? For example, is there a preferred size for S3?
    
    32K didn't seem large enough to worry about, even if we had 1000 fragments busily spilling. But 1MB? 1000 * 1 MB = 1GB, which starts becoming significant, especially in light of our efforts to reduce heap usage. Should we worry?


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by vrozov <gi...@git.apache.org>.
Github user vrozov commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r161842736
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java ---
    @@ -104,9 +107,17 @@
          * nodes provide insufficient local disk space)
          */
     
    +    // The buffer size is calculated as LCM of the Hadoop internal checksum buffer (9 * checksum length), where
    --- End diff --
    
    @paul-rogers Changed `TRANSFER_SIZE` to 72K. It is calculated as LCM between Hadoop internal checksum buffer and MapR FS page size to make writes aligned on internal buffer boundaries.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by paul-rogers <gi...@git.apache.org>.
Github user paul-rogers commented on a diff in the pull request:

    https://github.com/apache/drill/pull/1058#discussion_r154475350
  
    --- Diff: exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java ---
    @@ -62,27 +72,65 @@ public Writer write(VectorAccessible va) throws IOException {
     
         @SuppressWarnings("resource")
         public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
    +      checkNotNull(va);
           WritableBatch batch = WritableBatch.getBatchNoHVWrap(
               va.getRecordCount(), va, sv2 != null);
           return write(batch, sv2);
         }
     
         public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
    -      VectorAccessibleSerializable vas;
    -      if (sv2 == null) {
    -        vas = new VectorAccessibleSerializable(batch, allocator);
    -      } else {
    -        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
    -      }
    -      if (retain) {
    -        vas.writeToStreamAndRetain(stream);
    -      } else {
    -        vas.writeToStream(stream);
    +      return write(batch, sv2, false);
    +    }
    +
    +    public Writer write(WritableBatch batch, SelectionVector2 sv2, boolean retain) throws IOException {
    +      checkNotNull(batch);
    +      checkNotNull(channel);
    +      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
    +
    +      final DrillBuf[] incomingBuffers = batch.getBuffers();
    +      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
    +
    +      try {
    +        /* Write the metadata to the file */
    +        batchDef.writeDelimitedTo(output);
    +
    +
    +        /* If we have a selection vector, dump it to file first */
    +        if (sv2 != null) {
    +          final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
    +          channel.write(sv2.getBuffer(false).nioBuffer(0, dataLength));
    +        }
    +
    +        /* Dump the array of ByteBuf's associated with the value vectors */
    +        for (DrillBuf buf : incomingBuffers) {
    +          /* dump the buffer into the OutputStream */
    +          channel.write(buf.nioBuffer());
    --- End diff --
    
    While this technically works, it is a bit inefficient. Above, we write only the portion of the SV2 buffer that is actually used. But, here we grabbed buffers generically. Unless the write positions were set correctly for each buffer, we don't know the amount of the buffer that contains data. We may be writing the whole buffer, including unused internally-fragmented space.
    
    If the write position is used to limit the write size, than the buffer write position should work just as well for the SV2, so we would not need the manual computation of used buffer size.
    
    Perhaps the old code worked this way. But, I wonder if, since we are looking for greater performance, we should handle the issue here. (At present, Parquet produces batches with, say, 5% data and 95% unused. Do we want to spill all that space?)
    
    Plus, since we spill the extra data, and we do power-of-two rounding on read, this probably explains why the sort must allow 2x the amount of memory for a deserialized spill batch than seems necessary from the sum of the data blocks.


---

[GitHub] drill pull request #1058: DRILL-6002: Avoid memory copy from direct buffer t...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/drill/pull/1058


---