You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by am...@apache.org on 2018/01/24 18:35:20 UTC
[06/11] drill git commit: DRILL-6002: Avoid memory copy from direct
buffer to heap while spilling to local disk
DRILL-6002: Avoid memory copy from direct buffer to heap while spilling to local disk
close apache/drill#1058
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/d803f0c2
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/d803f0c2
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/d803f0c2
Branch: refs/heads/master
Commit: d803f0c2188c679de3dacf10741005b217425a33
Parents: 2420b35
Author: Vlad Rozov <vr...@apache.org>
Authored: Wed Nov 22 14:06:13 2017 -0800
Committer: Aman Sinha <as...@maprtech.com>
Committed: Tue Jan 23 17:45:50 2018 -0800
----------------------------------------------------------------------
.../drill/exec/cache/VectorSerializer.java | 107 +++++++++++++------
.../impl/aggregate/HashAggTemplate.java | 38 +++----
.../exec/physical/impl/spill/SpillSet.java | 99 +++++++++++++----
.../physical/impl/xsort/managed/BatchGroup.java | 28 ++---
.../impl/xsort/managed/SpilledRuns.java | 2 +-
.../exec/cache/TestBatchSerialization.java | 34 ++++--
6 files changed, 210 insertions(+), 98 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
index eeef9e5..03ea11e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/cache/VectorSerializer.java
@@ -17,16 +17,30 @@
*/
package org.apache.drill.exec.cache;
+import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.concurrent.TimeUnit;
import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.metrics.DrillMetrics;
+import org.apache.drill.exec.proto.UserBitShared;
import org.apache.drill.exec.record.VectorAccessible;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
+import com.codahale.metrics.MetricRegistry;
+import com.codahale.metrics.Timer;
+
+import io.netty.buffer.DrillBuf;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
/**
* Serializes vector containers to an output stream or from
* an input stream.
@@ -39,51 +53,84 @@ public class VectorSerializer {
* objects to an output stream.
*/
- public static class Writer {
+ public static class Writer implements Closeable
+ {
+ static final MetricRegistry metrics = DrillMetrics.getRegistry();
+ static final String WRITER_TIMER = MetricRegistry.name(VectorAccessibleSerializable.class, "writerTime");
- private final OutputStream stream;
- private final BufferAllocator allocator;
- private boolean retain;
+ private final WritableByteChannel channel;
+ private final OutputStream output;
private long timeNs;
+ private int bytesWritten;
- public Writer(BufferAllocator allocator, OutputStream stream) {
- this.allocator = allocator;
- this.stream = stream;
- }
-
- public Writer retain() {
- retain = true;
- return this;
+ private Writer(WritableByteChannel channel) {
+ this.channel = channel;
+ output = Channels.newOutputStream(channel);
}
- public Writer write(VectorAccessible va) throws IOException {
+ public int write(VectorAccessible va) throws IOException {
return write(va, null);
}
@SuppressWarnings("resource")
- public Writer write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
+ public int write(VectorAccessible va, SelectionVector2 sv2) throws IOException {
+ checkNotNull(va);
WritableBatch batch = WritableBatch.getBatchNoHVWrap(
va.getRecordCount(), va, sv2 != null);
- return write(batch, sv2);
+ try {
+ return write(batch, sv2);
+ } finally {
+ batch.clear();
+ }
}
- public Writer write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
- VectorAccessibleSerializable vas;
- if (sv2 == null) {
- vas = new VectorAccessibleSerializable(batch, allocator);
- } else {
- vas = new VectorAccessibleSerializable(batch, sv2, allocator);
+ public int write(WritableBatch batch, SelectionVector2 sv2) throws IOException {
+ checkNotNull(batch);
+ checkNotNull(channel);
+ final Timer.Context timerContext = metrics.timer(WRITER_TIMER).time();
+
+ final DrillBuf[] incomingBuffers = batch.getBuffers();
+ final UserBitShared.RecordBatchDef batchDef = batch.getDef();
+ bytesWritten = batchDef.getSerializedSize();
+
+ /* Write the metadata to the file */
+ batchDef.writeDelimitedTo(output);
+
+ /* If we have a selection vector, dump it to file first */
+ if (sv2 != null) {
+ final int dataLength = sv2.getCount() * SelectionVector2.RECORD_SIZE;
+ ByteBuffer buffer = sv2.getBuffer(false).nioBuffer(0, dataLength);
+ while (buffer.remaining() > 0) {
+ bytesWritten += channel.write(buffer);
+ }
}
- if (retain) {
- vas.writeToStreamAndRetain(stream);
- } else {
- vas.writeToStream(stream);
+
+ /* Dump the array of ByteBuf's associated with the value vectors */
+ for (DrillBuf buf : incomingBuffers) {
+ /* dump the buffer into the OutputStream */
+ ByteBuffer buffer = buf.nioBuffer();
+ while (buffer.remaining() > 0) {
+ bytesWritten += channel.write(buffer);
+ }
}
- timeNs += vas.getTimeNs();
- return this;
+
+ timeNs += timerContext.stop();
+ return bytesWritten;
}
- public long timeNs() { return timeNs; }
+ @Override
+ public void close() throws IOException {
+ if (!channel.isOpen()) {
+ return;
+ }
+ channel.close();
+ }
+
+ public long time(TimeUnit unit) {
+ return unit.convert(timeNs, TimeUnit.NANOSECONDS);
+ }
+
+ public int getBytesWritten() { return bytesWritten; }
}
/**
@@ -111,8 +158,8 @@ public class VectorSerializer {
public long timeNs() { return timeNs; }
}
- public static Writer writer(BufferAllocator allocator, OutputStream stream) {
- return new Writer(allocator, stream);
+ public static Writer writer(WritableByteChannel channel) throws IOException {
+ return new Writer(channel);
}
public static Reader reader(BufferAllocator allocator, InputStream stream) {
http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
index 2f181fe..89ba59b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java
@@ -18,7 +18,6 @@
package org.apache.drill.exec.physical.impl.aggregate;
import java.io.IOException;
-import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -26,7 +25,6 @@ import java.util.concurrent.TimeUnit;
import javax.inject.Named;
-import com.google.common.base.Stopwatch;
import org.apache.drill.common.exceptions.RetryAfterSpillException;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ExpressionPosition;
@@ -34,7 +32,7 @@ import org.apache.drill.common.expression.FieldReference;
import org.apache.drill.common.expression.LogicalExpression;
import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.cache.VectorAccessibleSerializable;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
import org.apache.drill.exec.compile.sig.RuntimeOverridden;
import org.apache.drill.exec.exception.ClassTransformationException;
import org.apache.drill.exec.exception.OutOfMemoryException;
@@ -149,7 +147,7 @@ public abstract class HashAggTemplate implements HashAggregator {
// For handling spilling
private SpillSet spillSet;
SpilledRecordbatch newIncoming; // when reading a spilled file - work like an "incoming"
- private OutputStream outputStream[]; // an output stream for each spilled partition
+ private Writer writers[]; // a vector writer for each spilled partition
private int spilledBatchesCount[]; // count number of batches spilled, in each partition
private String spillFiles[];
private int cycleNum = 0; // primary, secondary, tertiary, etc.
@@ -454,7 +452,7 @@ public abstract class HashAggTemplate implements HashAggregator {
htables = new HashTable[numPartitions] ;
batchHolders = (ArrayList<BatchHolder>[]) new ArrayList<?>[numPartitions] ;
outBatchIndex = new int[numPartitions] ;
- outputStream = new OutputStream[numPartitions];
+ writers = new Writer[numPartitions];
spilledBatchesCount = new int[numPartitions];
spillFiles = new String[numPartitions];
spilledPartitionsList = new ArrayList<SpilledPartition>();
@@ -504,7 +502,7 @@ public abstract class HashAggTemplate implements HashAggregator {
batchHolders[i] = new ArrayList<BatchHolder>();
}
outBatchIndex[i] = 0;
- outputStream[i] = null;
+ writers[i] = null;
spilledBatchesCount[i] = 0;
spillFiles[i] = null;
}
@@ -792,14 +790,14 @@ public abstract class HashAggTemplate implements HashAggregator {
}
// delete any (still active) output spill file
- if ( outputStream[i] != null && spillFiles[i] != null) {
+ if ( writers[i] != null && spillFiles[i] != null) {
try {
- outputStream[i].close();
- outputStream[i] = null;
+ spillSet.close(writers[i]);
+ writers[i] = null;
spillSet.delete(spillFiles[i]);
spillFiles[i] = null;
} catch(IOException e) {
- logger.warn("Cleanup: Failed to delete spill file {}",spillFiles[i]);
+ logger.warn("Cleanup: Failed to delete spill file {}", spillFiles[i], e);
}
}
}
@@ -854,7 +852,7 @@ public abstract class HashAggTemplate implements HashAggregator {
}
private boolean isSpilled(int part) {
- return outputStream[part] != null;
+ return writers[part] != null;
}
/**
* Which partition to choose for flushing out (i.e. spill or return) ?
@@ -932,7 +930,7 @@ public abstract class HashAggTemplate implements HashAggregator {
spillFiles[part] = spillSet.getNextSpillFile(cycleNum > 0 ? Integer.toString(cycleNum) : null);
try {
- outputStream[part] = spillSet.openForOutput(spillFiles[part]);
+ writers[part] = spillSet.writer(spillFiles[part]);
} catch (IOException ioe) {
throw UserException.resourceError(ioe)
.message("Hash Aggregation failed to open spill file: " + spillFiles[part])
@@ -975,17 +973,17 @@ public abstract class HashAggTemplate implements HashAggregator {
outContainer.setRecordCount(numPendingOutput);
WritableBatch batch = WritableBatch.getBatchNoHVWrap(numPendingOutput, outContainer, false);
- VectorAccessibleSerializable outputBatch = new VectorAccessibleSerializable(batch, allocator);
- Stopwatch watch = Stopwatch.createStarted();
try {
- outputBatch.writeToStream(outputStream[part]);
+ writers[part].write(batch, null);
} catch (IOException ioe) {
throw UserException.dataWriteError(ioe)
- .message("Hash Aggregation failed to write to output stream: " + outputStream[part].toString())
+ .message("Hash Aggregation failed to write to output file: " + spillFiles[part])
.build(logger);
+ } finally {
+ batch.clear();
}
outContainer.zeroVectors();
- logger.trace("HASH AGG: Took {} us to spill {} records", watch.elapsed(TimeUnit.MICROSECONDS), numPendingOutput);
+ logger.trace("HASH AGG: Took {} us to spill {} records", writers[part].time(TimeUnit.MICROSECONDS), numPendingOutput);
}
spilledBatchesCount[part] += currPartition.size(); // update count of spilled batches
@@ -1048,16 +1046,14 @@ public abstract class HashAggTemplate implements HashAggregator {
spilledPartitionsList.add(sp);
reinitPartition(nextPartitionToReturn); // free the memory
- long posn = spillSet.getPosition(outputStream[nextPartitionToReturn]);
- spillSet.tallyWriteBytes(posn); // for the IO stats
try {
- outputStream[nextPartitionToReturn].close();
+ spillSet.close(writers[nextPartitionToReturn]);
} catch (IOException ioe) {
throw UserException.resourceError(ioe)
.message("IO Error while closing output stream")
.build(logger);
}
- outputStream[nextPartitionToReturn] = null;
+ writers[nextPartitionToReturn] = null;
}
else {
currPartition = batchHolders[nextPartitionToReturn];
http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
index 9a6420a..2f9ab14 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/spill/SpillSet.java
@@ -18,13 +18,15 @@
package org.apache.drill.exec.physical.impl.spill;
import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.nio.channels.WritableByteChannel;
+import java.nio.file.StandardOpenOption;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -32,6 +34,7 @@ import java.util.Set;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.cache.VectorSerializer;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.HashAggregate;
@@ -65,7 +68,7 @@ public class SpillSet {
void deleteOnExit(String fragmentSpillDir) throws IOException;
- OutputStream createForWrite(String fileName) throws IOException;
+ WritableByteChannel createForWrite(String fileName) throws IOException;
InputStream openForInput(String fileName) throws IOException;
@@ -77,10 +80,10 @@ public class SpillSet {
* Given a manager-specific output stream, return the current write position.
* Used to report total write bytes.
*
- * @param outputStream output stream created by the file manager
+ * @param channel created by the file manager
* @return
*/
- long getWriteBytes(OutputStream outputStream);
+ long getWriteBytes(WritableByteChannel channel);
/**
* Given a manager-specific input stream, return the current read position.
@@ -104,9 +107,17 @@ public class SpillSet {
* nodes provide insufficient local disk space)
*/
+ // The buffer size is calculated as LCM of the Hadoop internal checksum buffer (9 * checksum length), where
+ // checksum length is 512 by default, and MapRFS page size that equals to 8 * 1024. The length of the transfer
+ // buffer does not affect performance of the write to hdfs or maprfs significantly once buffer length is more
+ // than 32 bytes.
+ private static final int TRANSFER_SIZE = 9 * 8 * 1024;
+
+ private final byte buffer[];
private FileSystem fs;
protected HadoopFileManager(String fsName) {
+ buffer = new byte[TRANSFER_SIZE];
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, fsName);
try {
@@ -124,8 +135,8 @@ public class SpillSet {
}
@Override
- public OutputStream createForWrite(String fileName) throws IOException {
- return fs.create(new Path(fileName));
+ public WritableByteChannel createForWrite(String fileName) throws IOException {
+ return new WritableByteChannelImpl(buffer, fs.create(new Path(fileName)));
}
@Override
@@ -152,10 +163,10 @@ public class SpillSet {
}
@Override
- public long getWriteBytes(OutputStream outputStream) {
+ public long getWriteBytes(WritableByteChannel channel) {
try {
- return ((FSDataOutputStream) outputStream).getPos();
- } catch (IOException e) {
+ return ((FSDataOutputStream)((WritableByteChannelImpl)channel).out).getPos();
+ } catch (Exception e) {
// Just used for logging, not worth dealing with the exception.
return 0;
}
@@ -295,10 +306,8 @@ public class SpillSet {
@SuppressWarnings("resource")
@Override
- public OutputStream createForWrite(String fileName) throws IOException {
- return new CountingOutputStream(
- new BufferedOutputStream(
- new FileOutputStream(new File(baseDir, fileName))));
+ public WritableByteChannel createForWrite(String fileName) throws IOException {
+ return FileChannel.open(new File(baseDir, fileName).toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
}
@SuppressWarnings("resource")
@@ -321,8 +330,13 @@ public class SpillSet {
}
@Override
- public long getWriteBytes(OutputStream outputStream) {
- return ((CountingOutputStream) outputStream).getCount();
+ public long getWriteBytes(WritableByteChannel channel)
+ {
+ try {
+ return ((FileChannel)channel).position();
+ } catch (Exception e) {
+ return 0;
+ }
}
@Override
@@ -331,6 +345,44 @@ public class SpillSet {
}
}
+ private static class WritableByteChannelImpl implements WritableByteChannel
+ {
+ private final byte buffer[];
+ private OutputStream out;
+
+ WritableByteChannelImpl(byte[] buffer, OutputStream out) {
+ this.buffer = buffer;
+ this.out = out;
+ }
+
+ @Override
+ public int write(ByteBuffer src) throws IOException {
+ int remaining = src.remaining();
+ int totalWritten = 0;
+ synchronized (buffer) {
+ for (int posn = 0; posn < remaining; posn += buffer.length) {
+ int len = Math.min(buffer.length, remaining - posn);
+ src.get(buffer, 0, len);
+ out.write(buffer, 0, len);
+ totalWritten += len;
+ }
+ }
+ return totalWritten;
+ }
+
+ @Override
+ public boolean isOpen()
+ {
+ return out != null;
+ }
+
+ @Override
+ public void close() throws IOException {
+ out.close();
+ out = null;
+ }
+ }
+
private final Iterator<String> dirs;
/**
@@ -457,7 +509,7 @@ public class SpillSet {
return fileManager.openForInput(fileName);
}
- public OutputStream openForOutput(String fileName) throws IOException {
+ public WritableByteChannel openForOutput(String fileName) throws IOException {
return fileManager.createForWrite(fileName);
}
@@ -484,8 +536,8 @@ public class SpillSet {
return fileManager.getReadBytes(inputStream);
}
- public long getPosition(OutputStream outputStream) {
- return fileManager.getWriteBytes(outputStream);
+ public long getPosition(WritableByteChannel channel) {
+ return fileManager.getWriteBytes(channel);
}
public void tallyReadBytes(long readLength) {
@@ -495,4 +547,13 @@ public class SpillSet {
public void tallyWriteBytes(long writeLength) {
writeBytes += writeLength;
}
+
+ public VectorSerializer.Writer writer(String fileName) throws IOException {
+ return VectorSerializer.writer(openForOutput(fileName));
+ }
+
+ public void close(VectorSerializer.Writer writer) throws IOException {
+ tallyWriteBytes(writer.getBytesWritten());
+ writer.close();
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
index d902e0d..bd2e368 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/BatchGroup.java
@@ -19,7 +19,6 @@ package org.apache.drill.exec.physical.impl.xsort.managed;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.util.Collection;
import java.util.Iterator;
import java.util.concurrent.TimeUnit;
@@ -27,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.cache.VectorSerializer;
+import org.apache.drill.exec.cache.VectorSerializer.Writer;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.impl.spill.SpillSet;
import org.apache.drill.exec.record.BatchSchema;
@@ -139,13 +139,12 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
public static class SpilledRun extends BatchGroup {
private InputStream inputStream;
- private OutputStream outputStream;
private String path;
private SpillSet spillSet;
private BufferAllocator allocator;
private int spilledBatches;
private long batchSize;
- private VectorSerializer.Writer writer;
+ private Writer writer;
private VectorSerializer.Reader reader;
public SpilledRun(SpillSet spillSet, String path, BufferAllocator allocator) throws IOException {
@@ -153,15 +152,13 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
this.spillSet = spillSet;
this.path = path;
this.allocator = allocator;
- outputStream = spillSet.openForOutput(path);
- writer = VectorSerializer.writer(allocator, outputStream);
+ writer = spillSet.writer(path);
}
public void addBatch(VectorContainer newContainer) throws IOException {
- Stopwatch watch = Stopwatch.createStarted();
writer.write(newContainer);
newContainer.zeroVectors();
- logger.trace("Wrote {} records in {} us", newContainer.getRecordCount(), watch.elapsed(TimeUnit.MICROSECONDS));
+ logger.trace("Wrote {} records in {} us", newContainer.getRecordCount(), writer.time(TimeUnit.MICROSECONDS));
spilledBatches++;
// Hold onto the husk of the last added container so that we have a
@@ -249,7 +246,7 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
ex = e;
}
try {
- closeOutputStream();
+ closeWriter();
} catch (IOException e) {
ex = ex == null ? e : ex;
}
@@ -280,17 +277,12 @@ public abstract class BatchGroup implements VectorAccessible, AutoCloseable {
logger.trace("Summary: Read {} bytes from {}", readLength, path);
}
- public long closeOutputStream() throws IOException {
- if (outputStream == null) {
- return 0;
+ public void closeWriter() throws IOException {
+ if (writer != null) {
+ spillSet.close(writer);
+ logger.trace("Summary: Wrote {} bytes in {} us to {}", writer.getBytesWritten(), writer.time(TimeUnit.MICROSECONDS), path);
+ writer = null;
}
- long writeSize = spillSet.getPosition(outputStream);
- spillSet.tallyWriteBytes(writeSize);
- outputStream.close();
- outputStream = null;
- writer = null;
- logger.trace("Summary: Wrote {} bytes to {}", writeSize, path);
- return writeSize;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
index 3d7e63a..bbf4457 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/managed/SpilledRuns.java
@@ -178,7 +178,7 @@ public class SpilledRuns {
newGroup.addBatch(dest);
}
context.injectChecked(ExternalSortBatch.INTERRUPTION_WHILE_SPILLING, IOException.class);
- newGroup.closeOutputStream();
+ newGroup.closeWriter();
logger.trace("Spilled {} output batches, each of {} bytes, {} records, to {}",
merger.getBatchCount(), merger.getEstBatchSize(),
spillBatchRowCount, outputFile);
http://git-wip-us.apache.org/repos/asf/drill/blob/d803f0c2/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
index a283924..bcf0618 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/cache/TestBatchSerialization.java
@@ -18,17 +18,19 @@
package org.apache.drill.exec.cache;
import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
+import java.nio.channels.FileChannel;
+import java.nio.file.StandardOpenOption;
import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.exec.cache.VectorSerializer.Reader;
import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.cache.VectorSerializer.Reader;
+import org.apache.drill.exec.record.VectorContainer;
+import org.apache.drill.exec.record.selection.SelectionVector2;
+import org.apache.drill.test.DirTestWatcher;
import org.apache.drill.test.DrillTest;
import org.apache.drill.test.OperatorFixture;
import org.apache.drill.test.rowSet.RowSet;
@@ -40,10 +42,15 @@ import org.apache.drill.test.rowSet.RowSetWriter;
import org.apache.drill.test.rowSet.SchemaBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
+import org.junit.ClassRule;
import org.junit.Test;
+import static org.junit.Assert.assertTrue;
+
public class TestBatchSerialization extends DrillTest {
+ @ClassRule
+ public static final DirTestWatcher dirTestWatcher = new DirTestWatcher();
public static OperatorFixture fixture;
@BeforeClass
@@ -117,12 +124,21 @@ public class TestBatchSerialization extends DrillTest {
*/
private void verifySerialize(SingleRowSet rowSet, SingleRowSet expected) throws IOException {
- File dir = OperatorFixture.getTempDir("serial");
- File outFile = new File(dir, "serialze.dat");
- try (OutputStream out = new BufferedOutputStream(new FileOutputStream(outFile))) {
- VectorSerializer.writer(fixture.allocator(), out)
- .write(rowSet.container(), rowSet.getSv2());
+ File dir = DirTestWatcher.createTempDir(dirTestWatcher.getDir());
+ FileChannel channel = FileChannel.open(new File(dir, "serialize.dat").toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE);
+ VectorSerializer.Writer writer = VectorSerializer.writer(channel);
+ VectorContainer container = rowSet.container();
+ SelectionVector2 sv2 = rowSet.getSv2();
+ writer.write(container, sv2);
+ container.clear();
+ if (sv2 != null) {
+ sv2.clear();
}
+ writer.close();
+
+ File outFile = new File(dir, "serialize.dat");
+ assertTrue(outFile.exists());
+ assertTrue(outFile.isFile());
RowSet result;
try (InputStream in = new BufferedInputStream(new FileInputStream(outFile))) {