You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/01/24 18:35:20 UTC

[06/11] drill git commit: DRILL-6002: Avoid memory copy from direct buffer to heap while spilling to local disk

DRILL-6002: Avoid memory copy from direct buffer to heap while spilling to local disk

close apache/drill#1058


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d803f0c2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d803f0c2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d803f0c2

Branch: refs/heads/master
Commit: d803f0c2188c679de3dacf10741005b217425a33
Parents: 2420b35
Author: Vlad Rozov <vr...@apache.org>
Authored: Wed Nov 22 14:06:13 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Tue Jan 23 17:45:50 2018 -0800

----------------------------------------------------------------------
 .../drill/exec/cache/VectorSerializer.java      | 107 +++++++++++++------
 .../impl/aggregate/HashAggTemplate.java         |  38 +++----
 .../exec/physical/impl/spill/SpillSet.java      |  99 +++++++++++++----
 .../physical/impl/xsort/managed/BatchGroup.java |  28 ++---
 .../impl/xsort/managed/SpilledRuns.java         |   2 +-
 .../exec/cache/TestBatchSerialization.java      |  34 ++++--
 6 files changed, 210 insertions(+), 98 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
index eeef9e5..03ea11e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
@@ -17,16 +17,30 @@
  */
 package org.apache.drill.exec.cache;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared;
 import org.apache.drill.exec.record.VectorAccessible;
 import org.apache.drill.exec.record.VectorContainer;
 import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+import io.netty.buffer.DrillBuf;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
 /**
  * Serializes vector containers to an output stream or from
  * an input stream.
@@ -39,51 +53,84 @@ public class VectorSerializer {
    * objects to an output stream.
    */
 
-  public static class Writer {
+  public static class Writer implements Closeable
+  {
+    static final MetricRegistry metrics = DrillMetrics.getRegistry();
+    static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
 
-    private final OutputStream stream;
-    private final BufferAllocator allocator;
-    private boolean retain;
+    private final WritableByteChannel channel;
+    private final OutputStream output;
     private long timeNs;
+    private int bytesWritten;
 
-    public Writer(BufferAllocator allocator, OutputStream stream) {
-      this.allocator = allocator;
-      this.stream = stream;
-    }
-
-    public Writer retain() {
-      retain = true;
-      return this;
+    private Writer(WritableByteChannel channel) {
+      this.channel = channel;
+      output = Channels.newOutputStream(channel);
     }
 
-    public Writer write(VectorAccessible va) throws IOException {
+    public int write(VectorAccessible va) throws IOException {
       return write(va, null);
     }
 
     @SuppressWarnings("resource")
-    public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
+    public int write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
+      checkNotNull(va);
       WritableBatch batch = WritableBatch.getBatchNoHVWrap(
           va.getRecordCount(), va, sv2 != null);
-      return write(batch, sv2);
+      try {
+        return write(batch, sv2);
+      } finally {
+        batch.clear();
+      }
     }
 
-    public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
-      VectorAccessibleSerializable vas;
-      if (sv2 == null) {
-        vas = new VectorAccessibleSerializable(batch, allocator);
-      } else {
-        vas = new VectorAccessibleSerializable(batch, sv2, allocator);
+    public int write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
+      checkNotNull(batch);
+      checkNotNull(channel);
+      final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
+
+      final DrillBuf[] incomingBuffers = batch.getBuffers();
+      final UserBitShared.RecordBatchDef batchDef = batch.getDef();
+      bytesWritten = batchDef.getSerializedSize();
+
+      /* Write the metadata to the file */
+      batchDef.writeDelimitedTo(output);
+
+      /* If we have a selection vector, dump it to file first */
+      if (sv2 != null) {
+        final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
+        ByteBuffer buffer = sv2.getBuffer(false).nioBuffer(0, dataLength);
+        while (buffer.remaining() > 0) {
+          bytesWritten += channel.write(buffer);
+        }
       }
-      if (retain) {
-        vas.writeToStreamAndRetain(stream);
-      } else {
-        vas.writeToStream(stream);
+
+      /* Dump the array of ByteBuf's associated with the value vectors */
+      for (DrillBuf buf : incomingBuffers) {
+        /* dump the buffer into the OutputStream */
+        ByteBuffer buffer = buf.nioBuffer();
+        while (buffer.remaining() > 0) {
+          bytesWritten += channel.write(buffer);
+        }
       }
-      timeNs += vas.getTimeNs();
-      return this;
+
+      timeNs += timerContext.stop();
+      return bytesWritten;
     }
 
-    public long timeNs() { return timeNs; }
+    @Override
+    public void close() throws IOException {
+      if (!channel.isOpen()) {
+        return;
+      }
+      channel.close();
+    }
+
+    public long time(TimeUnit unit) {
+      return unit.convert(timeNs, TimeUnit.NANOSECONDS);
+    }
+
+    public int getBytesWritten() { return bytesWritten; }
   }
 
   /**
@@ -111,8 +158,8 @@ public class VectorSerializer {
     public long timeNs() { return timeNs; }
   }
 
-  public static Writer writer(BufferAllocator allocator, OutputStream stream) {
-    return new Writer(allocator, stream);
+  public static Writer writer(WritableByteChannel channel) throws IOException {
+    return new Writer(channel);
   }
 
   public static Reader reader(BufferAllocator allocator, InputStream stream) {

http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 2f181fe..89ba59b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.impl.aggregate;
 
 import java.io.IOException;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
@@ -26,7 +25,6 @@ import java.util.concurrent.TimeUnit;
 
 import javax.inject.Named;
 
-import com.google.common.base.Stopwatch;
 import org.apache.drill.common.exceptions.RetryAfterSpillException;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ExpressionPosition;
@@ -34,7 +32,7 @@ import org.apache.drill.common.expression.FieldReference;
 import org.apache.drill.common.expression.LogicalExpression;
 
 import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.compile.sig.RuntimeOverridden;
 import org.apache.drill.exec.exception.ClassTransformationException;
 import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -149,7 +147,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   // For handling spilling
   private SpillSet spillSet;
   SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
-  private OutputStream outputStream[]; // an output stream for each spilled partition
+  private Writer writers[]; // a vector writer for each spilled partition
   private int spilledBatchesCount[]; // count number of batches spilled, in each partition
   private String spillFiles[];
   private int cycleNum = 0; // primary, secondary, tertiary, etc.
@@ -454,7 +452,7 @@ public abstract class HashAggTemplate implements HashAggregator {
     htables = new HashTable[numPartitions] ;
     batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions] ;
     outBatchIndex = new int[numPartitions] ;
-    outputStream = new OutputStream[numPartitions];
+    writers = new Writer[numPartitions];
     spilledBatchesCount = new int[numPartitions];
     spillFiles = new String[numPartitions];
     spilledPartitionsList = new ArrayList<SpilledPartition>();
@@ -504,7 +502,7 @@ public abstract class HashAggTemplate implements HashAggregator {
         batchHolders[i] = new ArrayList<BatchHolder>();
       }
       outBatchIndex[i] = 0;
-      outputStream[i] = null;
+      writers[i] = null;
       spilledBatchesCount[i] = 0;
       spillFiles[i] = null;
     }
@@ -792,14 +790,14 @@ public abstract class HashAggTemplate implements HashAggregator {
           }
 
           // delete any (still active) output spill file
-          if ( outputStream[i] != null && spillFiles[i] != null) {
+          if ( writers[i] != null && spillFiles[i] != null) {
             try {
-              outputStream[i].close();
-              outputStream[i] = null;
+              spillSet.close(writers[i]);
+              writers[i] = null;
               spillSet.delete(spillFiles[i]);
               spillFiles[i] = null;
             } catch(IOException e) {
-              logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
+              logger.warn("Cleanup: Failed to delete spill file {}", spillFiles[i], e);
             }
           }
     }
@@ -854,7 +852,7 @@ public abstract class HashAggTemplate implements HashAggregator {
   }
 
   private boolean isSpilled(int part) {
-    return outputStream[part] != null;
+    return writers[part] != null;
   }
   /**
    * Which partition to choose for flushing out (i.e. spill or return) ?
@@ -932,7 +930,7 @@ public abstract class HashAggTemplate implements HashAggregator {
       spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null);
 
       try {
-        outputStream[part] = spillSet.openForOutput(spillFiles[part]);
+        writers[part] = spillSet.writer(spillFiles[part]);
       } catch (IOException ioe) {
         throw UserException.resourceError(ioe)
             .message("Hash Aggregation failed to open spill file: " + spillFiles[part])
@@ -975,17 +973,17 @@ public abstract class HashAggTemplate implements HashAggregator {
 
       outContainer.setRecordCount(numPendingOutput);
       WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
-      VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
-      Stopwatch watch = Stopwatch.createStarted();
       try {
-        outputBatch.writeToStream(outputStream[part]);
+        writers[part].write(batch, null);
       } catch (IOException ioe) {
         throw UserException.dataWriteError(ioe)
-            .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString())
+            .message("Hash Aggregation failed to write to output file: " + spillFiles[part])
             .build(logger);
+      } finally {
+        batch.clear();
       }
       outContainer.zeroVectors();
-      logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
+      logger.trace("HASH AGG: Took {} us to spill {} records", writers[part].time(TimeUnit.MICROSECONDS), numPendingOutput);
     }
 
     spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
@@ -1048,16 +1046,14 @@ public abstract class HashAggTemplate implements HashAggregator {
           spilledPartitionsList.add(sp);
 
           reinitPartition(nextPartitionToReturn); // free the memory
-          long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]);
-          spillSet.tallyWriteBytes(posn); // for the IO stats
           try {
-            outputStream[nextPartitionToReturn].close();
+            spillSet.close(writers[nextPartitionToReturn]);
           } catch (IOException ioe) {
             throw UserException.resourceError(ioe)
                 .message("IO Error while closing output stream")
                 .build(logger);
           }
-          outputStream[nextPartitionToReturn] = null;
+          writers[nextPartitionToReturn] = null;
         }
         else {
           currPartition = batchHolders[nextPartitionToReturn];

http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 9a6420a..2f9ab14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -18,13 +18,15 @@
 package org.apache.drill.exec.physical.impl.spill;
 
 import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.StandardOpenOption;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
@@ -32,6 +34,7 @@ import java.util.Set;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorSerializer;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.physical.config.HashAggregate;
@@ -65,7 +68,7 @@ public class SpillSet {
 
     void deleteOnExit(String fragmentSpillDir) throws IOException;
 
-    OutputStream createForWrite(String fileName) throws IOException;
+    WritableByteChannel createForWrite(String fileName) throws IOException;
 
     InputStream openForInput(String fileName) throws IOException;
 
@@ -77,10 +80,10 @@ public class SpillSet {
      * Given a manager-specific output stream, return the current write position.
      * Used to report total write bytes.
      *
-     * @param outputStream output stream created by the file manager
+     * @param channel created by the file manager
      * @return
      */
-    long getWriteBytes(OutputStream outputStream);
+    long getWriteBytes(WritableByteChannel channel);
 
     /**
      * Given a manager-specific input stream, return the current read position.
@@ -104,9 +107,17 @@ public class SpillSet {
      * nodes provide insufficient local disk space)
      */
 
+    // The buffer size is calculated as LCM of the Hadoop internal checksum buffer (9 * checksum length), where
+    // checksum length is 512 by default, and MapRFS page size that equals to 8 * 1024. The length of the transfer
+    // buffer does not affect performance of the write to hdfs or maprfs significantly once buffer length is more
+    // than 32 bytes.
+    private static final int TRANSFER_SIZE = 9 * 8 * 1024;
+
+    private final byte buffer[];
     private FileSystem fs;
 
     protected HadoopFileManager(String fsName) {
+      buffer = new byte[TRANSFER_SIZE];
       Configuration conf = new Configuration();
       conf.set(FileSystem.FS_DEFAULT_NAME_KEY, fsName);
       try {
@@ -124,8 +135,8 @@ public class SpillSet {
     }
 
     @Override
-    public OutputStream createForWrite(String fileName) throws IOException {
-      return fs.create(new Path(fileName));
+    public WritableByteChannel createForWrite(String fileName) throws IOException {
+      return new WritableByteChannelImpl(buffer, fs.create(new Path(fileName)));
     }
 
     @Override
@@ -152,10 +163,10 @@ public class SpillSet {
     }
 
     @Override
-    public long getWriteBytes(OutputStream outputStream) {
+    public long getWriteBytes(WritableByteChannel channel) {
       try {
-        return ((FSDataOutputStream) outputStream).getPos();
-      } catch (IOException e) {
+        return ((FSDataOutputStream)((WritableByteChannelImpl)channel).out).getPos();
+      } catch (Exception e) {
         // Just used for logging, not worth dealing with the exception.
         return 0;
       }
@@ -295,10 +306,8 @@ public class SpillSet {
 
     @SuppressWarnings("resource")
     @Override
-    public OutputStream createForWrite(String fileName) throws IOException {
-      return new CountingOutputStream(
-                new BufferedOutputStream(
-                    new FileOutputStream(new File(baseDir, fileName))));
+    public WritableByteChannel createForWrite(String fileName) throws IOException {
+      return FileChannel.open(new File(baseDir, fileName).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
     }
 
     @SuppressWarnings("resource")
@@ -321,8 +330,13 @@ public class SpillSet {
     }
 
     @Override
-    public long getWriteBytes(OutputStream outputStream) {
-      return ((CountingOutputStream) outputStream).getCount();
+    public long getWriteBytes(WritableByteChannel channel)
+    {
+      try {
+        return ((FileChannel)channel).position();
+      } catch (Exception e) {
+        return 0;
+      }
     }
 
     @Override
@@ -331,6 +345,44 @@ public class SpillSet {
     }
   }
 
+  private static class WritableByteChannelImpl implements WritableByteChannel
+  {
+    private final byte buffer[];
+    private OutputStream out;
+
+    WritableByteChannelImpl(byte[] buffer, OutputStream out) {
+      this.buffer = buffer;
+      this.out = out;
+    }
+
+    @Override
+    public int write(ByteBuffer src) throws IOException {
+      int remaining = src.remaining();
+      int totalWritten = 0;
+      synchronized (buffer) {
+        for (int posn = 0; posn < remaining; posn += buffer.length) {
+          int len = Math.min(buffer.length, remaining - posn);
+          src.get(buffer, 0, len);
+          out.write(buffer, 0, len);
+          totalWritten += len;
+        }
+      }
+      return totalWritten;
+    }
+
+    @Override
+    public boolean isOpen()
+    {
+      return out != null;
+    }
+
+    @Override
+    public void close() throws IOException {
+      out.close();
+      out = null;
+    }
+  }
+
   private final Iterator<String> dirs;
 
   /**
@@ -457,7 +509,7 @@ public class SpillSet {
     return fileManager.openForInput(fileName);
   }
 
-  public OutputStream openForOutput(String fileName) throws IOException {
+  public WritableByteChannel openForOutput(String fileName) throws IOException {
     return fileManager.createForWrite(fileName);
   }
 
@@ -484,8 +536,8 @@ public class SpillSet {
     return fileManager.getReadBytes(inputStream);
   }
 
-  public long getPosition(OutputStream outputStream) {
-    return fileManager.getWriteBytes(outputStream);
+  public long getPosition(WritableByteChannel channel) {
+    return fileManager.getWriteBytes(channel);
   }
 
   public void tallyReadBytes(long readLength) {
@@ -495,4 +547,13 @@ public class SpillSet {
   public void tallyWriteBytes(long writeLength) {
     writeBytes += writeLength;
   }
+
+  public VectorSerializer.Writer writer(String fileName) throws IOException {
+    return VectorSerializer.writer(openForOutput(fileName));
+  }
+
+  public void close(VectorSerializer.Writer writer) throws IOException {
+    tallyWriteBytes(writer.getBytesWritten());
+    writer.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
index d902e0d..bd2e368 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.concurrent.TimeUnit;
@@ -27,6 +26,7 @@ import java.util.concurrent.TimeUnit;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.impl.spill.SpillSet;
 import org.apache.drill.exec.record.BatchSchema;
@@ -139,13 +139,12 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
 
   public static class SpilledRun extends BatchGroup {
     private InputStream inputStream;
-    private OutputStream outputStream;
     private String path;
     private SpillSet spillSet;
     private BufferAllocator allocator;
     private int spilledBatches;
     private long batchSize;
-    private VectorSerializer.Writer writer;
+    private Writer writer;
     private VectorSerializer.Reader reader;
 
     public SpilledRun(SpillSet spillSet, String path, BufferAllocator allocator) throws IOException {
@@ -153,15 +152,13 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
       this.spillSet = spillSet;
       this.path = path;
       this.allocator = allocator;
-      outputStream = spillSet.openForOutput(path);
-      writer = VectorSerializer.writer(allocator, outputStream);
+      writer = spillSet.writer(path);
     }
 
     public void addBatch(VectorContainer newContainer) throws IOException {
-      Stopwatch watch = Stopwatch.createStarted();
       writer.write(newContainer);
       newContainer.zeroVectors();
-      logger.trace("Wrote {} records in {} us", newContainer.getRecordCount(), watch.elapsed(TimeUnit.MICROSECONDS));
+      logger.trace("Wrote {} records in {} us", newContainer.getRecordCount(), writer.time(TimeUnit.MICROSECONDS));
       spilledBatches++;
 
       // Hold onto the husk of the last added container so that we have a
@@ -249,7 +246,7 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
         ex = e;
       }
       try {
-        closeOutputStream();
+        closeWriter();
       } catch (IOException e) {
         ex = ex == null ? e : ex;
       }
@@ -280,17 +277,12 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
       logger.trace("Summary: Read {} bytes from {}", readLength, path);
     }
 
-    public long closeOutputStream() throws IOException {
-      if (outputStream == null) {
-        return 0;
+    public void closeWriter() throws IOException {
+      if (writer != null) {
+        spillSet.close(writer);
+        logger.trace("Summary: Wrote {} bytes in {} us to {}", writer.getBytesWritten(), writer.time(TimeUnit.MICROSECONDS), path);
+        writer = null;
       }
-      long writeSize = spillSet.getPosition(outputStream);
-      spillSet.tallyWriteBytes(writeSize);
-      outputStream.close();
-      outputStream = null;
-      writer = null;
-      logger.trace("Summary: Wrote {} bytes to {}", writeSize, path);
-      return writeSize;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index 3d7e63a..bbf4457 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -178,7 +178,7 @@ public class SpilledRuns {
         newGroup.addBatch(dest);
       }
       context.injectChecked(ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class);
-      newGroup.closeOutputStream();
+      newGroup.closeWriter();
       logger.trace("Spilled {} output batches, each of {} bytes, {} records, to {}",
                    merger.getBatchCount(), merger.getEstBatchSize(),
                    spillBatchRowCount, outputFile);

http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index a283924..bcf0618 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -18,17 +18,19 @@
 package org.apache.drill.exec.cache;
 
 import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
-import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
 
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.cache.VectorSerializer.Reader;
 import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.cache.VectorSerializer.Reader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.test.DirTestWatcher;
 import org.apache.drill.test.DrillTest;
 import org.apache.drill.test.OperatorFixture;
 import org.apache.drill.test.rowSet.RowSet;
@@ -40,10 +42,15 @@ import org.apache.drill.test.rowSet.RowSetWriter;
 import org.apache.drill.test.rowSet.SchemaBuilder;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.ClassRule;
 import org.junit.Test;
 
+import static org.junit.Assert.assertTrue;
+
 public class TestBatchSerialization extends DrillTest {
 
+  @ClassRule
+  public static final DirTestWatcher dirTestWatcher = new DirTestWatcher();
   public static OperatorFixture fixture;
 
   @BeforeClass
@@ -117,12 +124,21 @@ public class TestBatchSerialization extends DrillTest {
    */
   private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException {
 
-    File dir = OperatorFixture.getTempDir("serial");
-    File outFile = new File(dir, "serialze.dat");
-    try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) {
-      VectorSerializer.writer(fixture.allocator(), out)
-        .write(rowSet.container(), rowSet.getSv2());
+    File dir = DirTestWatcher.createTempDir(dirTestWatcher.getDir());
+    FileChannel channel = FileChannel.open(new File(dir, "serialize.dat").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+    VectorSerializer.Writer writer = VectorSerializer.writer(channel);
+    VectorContainer container = rowSet.container();
+    SelectionVector2 sv2 = rowSet.getSv2();
+    writer.write(container, sv2);
+    container.clear();
+    if (sv2 != null) {
+      sv2.clear();
     }
+    writer.close();
+
+    File outFile = new File(dir, "serialize.dat");
+    assertTrue(outFile.exists());
+    assertTrue(outFile.isFile());
 
     RowSet result;
     try (InputStream in = new BufferedInputStream(new FileInputStream(outFile))) {