You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/14 01:46:54 UTC
drill git commit: DRILL-2936: Use SpoolingRawBatchBuffer for
HashToMergeExchange In order to avoid deadlocks
Repository: drill
Updated Branches:
refs/heads/master c04a8f9fe -> 814f553f2
DRILL-2936: Use SpoolingRawBatchBuffer for HashToMergeExchange
In order to avoid deadlocks
Refactored common code in UnlimitedRawBatchBuffer and SpoolingRawBatchBuffer
into BaseRawBatchBuffer
Removed reflection-based construction of RawBatchBuffer. Now use choose implementation
based on plan
Updated SpoolingRawBatchBuffer to use a separate thread for spooling
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/814f553f
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/814f553f
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/814f553f
Branch: refs/heads/master
Commit: 814f553f2fe58c6d2d2095fbf71158fcda424658
Parents: c04a8f9
Author: Steven Phillips <sm...@apache.org>
Authored: Wed May 6 20:01:35 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Wed May 13 15:15:42 2015 -0700
----------------------------------------------------------------------
.../src/main/java/io/netty/buffer/DrillBuf.java | 2 +-
.../exec/physical/base/AbstractReceiver.java | 9 +-
.../drill/exec/physical/base/Receiver.java | 3 +
.../exec/physical/config/BroadcastExchange.java | 2 +-
.../physical/config/HashToMergeExchange.java | 2 +-
.../physical/config/HashToRandomExchange.java | 8 +-
.../physical/config/MergingReceiverPOP.java | 5 +-
.../config/OrderedPartitionExchange.java | 2 +-
.../physical/config/SingleMergeExchange.java | 2 +-
.../exec/physical/config/UnionExchange.java | 2 +-
.../physical/config/UnorderedDeMuxExchange.java | 2 +-
.../physical/config/UnorderedMuxExchange.java | 2 +-
.../exec/physical/config/UnorderedReceiver.java | 5 +-
.../drill/exec/record/RawFragmentBatch.java | 20 +-
.../exec/store/LocalSyncableFileSystem.java | 20 +-
.../exec/work/batch/AbstractDataCollector.java | 17 +-
.../exec/work/batch/BaseRawBatchBuffer.java | 249 ++++++++++
.../drill/exec/work/batch/RawBatchBuffer.java | 5 -
.../exec/work/batch/SpoolingRawBatchBuffer.java | 450 +++++++++++++------
.../work/batch/UnlimitedRawBatchBuffer.java | 200 ++-------
.../src/main/resources/drill-module.conf | 3 +-
.../exec/work/batch/TestSpoolingBuffer.java | 4 +-
.../work/batch/TestUnlimitedBatchBuffer.java | 166 -------
23 files changed, 672 insertions(+), 508 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
index 7f80f7a..6f61f30 100644
--- a/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
+++ b/exec/java-exec/src/main/java/io/netty/buffer/DrillBuf.java
@@ -755,7 +755,7 @@ public final class DrillBuf extends AbstractByteBuf implements AutoCloseable {
}
@Override
- public void close() throws Exception {
+ public void close() {
release();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
index f01d025..6bb0760 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/AbstractReceiver.java
@@ -33,15 +33,17 @@ public abstract class AbstractReceiver extends AbstractBase implements Receiver{
private final int oppositeMajorFragmentId;
private final List<MinorFragmentEndpoint> senders;
+ private final boolean spooling;
/**
* @param oppositeMajorFragmentId MajorFragmentId of fragments that are sending data to this receiver.
* @param senders List of sender MinorFragmentEndpoints each containing sender MinorFragmentId and Drillbit endpoint
* where it is running.
*/
- public AbstractReceiver(int oppositeMajorFragmentId, List<MinorFragmentEndpoint> senders){
+ public AbstractReceiver(int oppositeMajorFragmentId, List<MinorFragmentEndpoint> senders, boolean spooling){
this.oppositeMajorFragmentId = oppositeMajorFragmentId;
this.senders = ImmutableList.copyOf(senders);
+ this.spooling = spooling;
}
@Override
@@ -75,5 +77,10 @@ public abstract class AbstractReceiver extends AbstractBase implements Receiver{
public int getNumSenders() {
return senders.size();
}
+
+ @Override
+ public boolean isSpooling() {
+ return spooling;
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
index 04d6d7e..4b34205 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/base/Receiver.java
@@ -49,4 +49,7 @@ public interface Receiver extends FragmentLeaf {
@JsonProperty("sender-major-fragment")
public int getOppositeMajorFragmentId();
+
+ @JsonProperty("spooling")
+ public boolean isSpooling();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
index a37f638..89bc343 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/BroadcastExchange.java
@@ -49,6 +49,6 @@ public class BroadcastExchange extends AbstractExchange {
@Override
public Receiver getReceiver(int minorFragmentId) {
- return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+ return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), false);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
index f45ace9..f004118 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToMergeExchange.java
@@ -55,7 +55,7 @@ public class HashToMergeExchange extends AbstractExchange{
@Override
public Receiver getReceiver(int minorFragmentId) {
- return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExprs);
+ return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExprs, true);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
index 52d79c2..fb2f9d8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/HashToRandomExchange.java
@@ -34,6 +34,12 @@ import com.fasterxml.jackson.annotation.JsonTypeName;
public class HashToRandomExchange extends AbstractExchange{
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashToRandomExchange.class);
+ private static final boolean HASH_EXCHANGE_SPOOLING;
+
+ static {
+ HASH_EXCHANGE_SPOOLING = "true".equals(System.getProperty("drill.hash_exchange_spooling", "false"));
+ }
+
private final LogicalExpression expr;
@JsonCreator
@@ -50,7 +56,7 @@ public class HashToRandomExchange extends AbstractExchange{
@Override
public Receiver getReceiver(int minorFragmentId) {
- return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+ return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), HASH_EXCHANGE_SPOOLING);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
index 9416814..a6bab64 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/MergingReceiverPOP.java
@@ -42,8 +42,9 @@ public class MergingReceiverPOP extends AbstractReceiver{
@JsonCreator
public MergingReceiverPOP(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
@JsonProperty("senders") List<MinorFragmentEndpoint> senders,
- @JsonProperty("orderings") List<Ordering> orderings) {
- super(oppositeMajorFragmentId, senders);
+ @JsonProperty("orderings") List<Ordering> orderings,
+ @JsonProperty("spooling") boolean spooling) {
+ super(oppositeMajorFragmentId, senders, spooling);
this.orderings = orderings;
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
index c8dbc22..2463bc7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/OrderedPartitionExchange.java
@@ -73,7 +73,7 @@ public class OrderedPartitionExchange extends AbstractExchange {
@Override
public Receiver getReceiver(int minorFragmentId) {
- return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+ return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
index c812325..5da3900 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/SingleMergeExchange.java
@@ -72,7 +72,7 @@ public class SingleMergeExchange extends AbstractExchange {
@Override
public Receiver getReceiver(int minorFragmentId) {
- return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExpr);
+ return new MergingReceiverPOP(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), orderExpr, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
index b7b7835..318e6b1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnionExchange.java
@@ -69,7 +69,7 @@ public class UnionExchange extends AbstractExchange{
@Override
public Receiver getReceiver(int minorFragmentId) {
- return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations));
+ return new UnorderedReceiver(senderMajorFragmentId, PhysicalOperatorUtil.getIndexOrderedEndpoints(senderLocations), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
index 0bc6678..700a21b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedDeMuxExchange.java
@@ -46,7 +46,7 @@ public class UnorderedDeMuxExchange extends AbstractDeMuxExchange {
throw new IllegalStateException(String.format("Failed to find sender for receiver [%d]", minorFragmentId));
}
- return new UnorderedReceiver(this.senderMajorFragmentId, Collections.singletonList(sender));
+ return new UnorderedReceiver(this.senderMajorFragmentId, Collections.singletonList(sender), false);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
index 3028ee3..46f1fd7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedMuxExchange.java
@@ -45,7 +45,7 @@ public class UnorderedMuxExchange extends AbstractMuxExchange {
throw new IllegalStateException(String.format("Failed to find senders for receiver [%d]", minorFragmentId));
}
- return new UnorderedReceiver(senderMajorFragmentId, senders);
+ return new UnorderedReceiver(senderMajorFragmentId, senders, false);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
index e741dd4..77d718e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/config/UnorderedReceiver.java
@@ -34,8 +34,9 @@ public class UnorderedReceiver extends AbstractReceiver{
@JsonCreator
public UnorderedReceiver(@JsonProperty("sender-major-fragment") int oppositeMajorFragmentId,
- @JsonProperty("senders") List<MinorFragmentEndpoint> senders) {
- super(oppositeMajorFragmentId, senders);
+ @JsonProperty("senders") List<MinorFragmentEndpoint> senders,
+ @JsonProperty("spooling") boolean spooling) {
+ super(oppositeMajorFragmentId, senders, spooling);
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index edd79ac..f2f9450 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -19,15 +19,19 @@ package org.apache.drill.exec.record;
import io.netty.buffer.DrillBuf;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.rpc.data.AckSender;
public class RawFragmentBatch {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(RawFragmentBatch.class);
- final FragmentRecordBatch header;
- final DrillBuf body;
- final AckSender sender;
+ private final FragmentRecordBatch header;
+ private final DrillBuf body;
+ private final AckSender sender;
+
+ private AtomicBoolean ackSent = new AtomicBoolean(false);
public RawFragmentBatch(FragmentRecordBatch header, DrillBuf body, AckSender sender) {
super();
@@ -63,12 +67,18 @@ public class RawFragmentBatch {
return sender;
}
- public void sendOk() {
- sender.sendOk();
+ public synchronized void sendOk() {
+ if (sender != null && ackSent.compareAndSet(false, true)) {
+ sender.sendOk();
+ }
}
public long getByteCount() {
return body == null ? 0 : body.readableBytes();
}
+ public boolean isAckSent() {
+ return ackSent.get();
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
index b88cc28..58db550 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -26,9 +26,11 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.io.RandomAccessFile;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
+import java.util.RandomAccess;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.fs.ByteBufferReadable;
@@ -120,7 +122,8 @@ public class LocalSyncableFileSystem extends FileSystem {
@Override
public FileStatus getFileStatus(Path path) throws IOException {
- return null;
+ File file = new File(Path.getPathWithoutSchemeAndAuthority(path).toString());
+ return new FileStatus(file.length(), file.isDirectory(), 1, 0, file.lastModified(), path);
}
public class LocalSyncableOutputStream extends OutputStream implements Syncable {
@@ -166,9 +169,12 @@ public class LocalSyncableFileSystem extends FileSystem {
public class LocalInputStream extends InputStream implements Seekable, PositionedReadable, ByteBufferReadable {
private BufferedInputStream input;
+ private String path;
+ private long position = 0;
public LocalInputStream(Path path) throws IOException {
- input = new BufferedInputStream(new FileInputStream(path.toString()), 1024*1024);
+ this.path = path.toString();
+ input = new BufferedInputStream(new FileInputStream(new RandomAccessFile(this.path, "r").getFD()), 1024*1024);
}
@Override
@@ -188,13 +194,16 @@ public class LocalSyncableFileSystem extends FileSystem {
@Override
public void seek(long l) throws IOException {
- input.reset();
- input.skip(l);
+ input.close();
+ RandomAccessFile raf = new RandomAccessFile(path, "r");
+ raf.seek(l);
+ input = new BufferedInputStream(new FileInputStream(raf.getFD()), 1024*1024);
+ position = l;
}
@Override
public long getPos() throws IOException {
- throw new IOException("getPos not supported");
+ return position;
}
@Override
@@ -236,6 +245,7 @@ public class LocalSyncableFileSystem extends FileSystem {
public int read() throws IOException {
byte[] b = new byte[1];
input.read(b);
+ position++;
return (int) b[0] & 0xFF;
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
index ed16314..6f16976 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/AbstractDataCollector.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicIntegerArray;
import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.physical.MinorFragmentEndpoint;
import org.apache.drill.exec.physical.base.Receiver;
@@ -73,15 +74,19 @@ public abstract class AbstractDataCollector implements DataCollector{
buffers = new RawBatchBuffer[numBuffers];
remainingRequired = new AtomicInteger(numBuffers);
+ final boolean spooling = receiver.isSpooling();
+
try {
- String bufferClassName = context.getConfig().getString(ExecConstants.INCOMING_BUFFER_IMPL);
- Constructor<?> bufferConstructor = Class.forName(bufferClassName).getConstructor(FragmentContext.class, int.class);
- for(int i=0; i<numBuffers; i++) {
- buffers[i] = (RawBatchBuffer) bufferConstructor.newInstance(context, bufferCapacity);
+ for (int i = 0; i < numBuffers; i++) {
+ if (spooling) {
+ buffers[i] = new SpoolingRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId(), i);
+ } else {
+ buffers[i] = new UnlimitedRawBatchBuffer(context, bufferCapacity, receiver.getOppositeMajorFragmentId());
+ }
}
- } catch (InstantiationException | IllegalAccessException | InvocationTargetException |
- NoSuchMethodException | ClassNotFoundException e) {
+ } catch (IOException | OutOfMemoryException e) {
+ logger.error("Exception", e);
context.fail(e);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
new file mode 100644
index 0000000..5192e46
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/BaseRawBatchBuffer.java
@@ -0,0 +1,249 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.work.batch;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.drill.common.exceptions.DrillRuntimeException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.record.RawFragmentBatch;
+
+public abstract class BaseRawBatchBuffer<T> implements RawBatchBuffer {
+ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BaseRawBatchBuffer.class);
+
+ private static enum BufferState {
+ INIT,
+ STREAMS_FINISHED,
+ KILLED
+ }
+
+ protected interface BufferQueue<T> {
+ public void addOomBatch(RawFragmentBatch batch);
+ public RawFragmentBatch poll() throws IOException;
+ public RawFragmentBatch take() throws IOException, InterruptedException;
+ public boolean checkForOutOfMemory();
+ public int size();
+ public boolean isEmpty();
+ public void add(T obj);
+ }
+
+ protected BufferQueue<T> bufferQueue;
+ private volatile BufferState state = BufferState.INIT;
+ protected final int bufferSizePerSocket;
+ protected final AtomicBoolean outOfMemory = new AtomicBoolean(false);
+ private int streamCounter;
+ private final int fragmentCount;
+ protected final FragmentContext context;
+
+ public BaseRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
+ bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+
+ this.fragmentCount = fragmentCount;
+ this.streamCounter = fragmentCount;
+ this.context = context;
+ }
+
+ @Override
+ public void enqueue(final RawFragmentBatch batch) throws IOException {
+
+ // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
+ // ensure that tests run.
+ if (context != null && !context.shouldContinue()) {
+ this.kill(context);
+ }
+
+ if (isTerminated()) {
+ if (state == BufferState.KILLED) {
+ // do not even enqueue just release and send ack back
+ batch.release();
+ batch.sendOk();
+ return;
+ } else {
+ throw new IOException("Attempted to enqueue batch after finished");
+ }
+ }
+ if (batch.getHeader().getIsOutOfMemory()) {
+ handleOutOfMemory(batch);
+ return;
+ }
+ enqueueInner(batch);
+ }
+
+ /**
+ * handle the out of memory case
+ *
+ * @param batch
+ */
+ protected void handleOutOfMemory(final RawFragmentBatch batch) {
+ if (!bufferQueue.checkForOutOfMemory()) {
+ logger.debug("Adding OOM message to front of queue. Current queue size: {}", bufferQueue.size());
+ bufferQueue.addOomBatch(batch);
+ } else {
+ logger.debug("ignoring duplicate OOM message");
+ }
+ }
+
+ /**
+ * implementation specific method to enqueue batch
+ *
+ * @param batch
+ * @throws IOException
+ */
+ protected abstract void enqueueInner(final RawFragmentBatch batch) throws IOException;
+
+// ## Add assertion that all acks have been sent. TODO
+ @Override
+ public void cleanup() {
+ if (!isTerminated() && context.shouldContinue()) {
+ final String msg = String.format("Cleanup before finished. %d out of %d strams have finished", completedStreams(), fragmentCount);
+ final IllegalStateException e = new IllegalStateException(msg);
+ throw e;
+ }
+
+ if (!bufferQueue.isEmpty()) {
+ if (context.shouldContinue()) {
+ context.fail(new IllegalStateException("Batches still in queue during cleanup"));
+ logger.error("{} Batches in queue.", bufferQueue.size());
+ }
+ clearBufferWithBody();
+ }
+ }
+
+ @Override
+ public void kill(final FragmentContext context) {
+ state = BufferState.KILLED;
+ clearBufferWithBody();
+ }
+
+ /**
+ * Helper method to clear buffer with request bodies release also flushes ack queue - in case there are still
+ * responses pending
+ */
+ private void clearBufferWithBody() {
+ while (!bufferQueue.isEmpty()) {
+ final RawFragmentBatch batch;
+ try {
+ batch = bufferQueue.poll();
+ assertAckSent(batch);
+ } catch (IOException e) {
+ context.fail(e);
+ continue;
+ }
+ if (batch.getBody() != null) {
+ batch.getBody().release();
+ }
+ }
+ }
+
+ private void allStreamsFinished() {
+ if (state != BufferState.KILLED) {
+ state = BufferState.STREAMS_FINISHED;
+ }
+
+ if (!bufferQueue.isEmpty()) {
+ throw new IllegalStateException("buffer not empty when finished");
+ }
+ }
+
+ @Override
+ public RawFragmentBatch getNext() throws IOException {
+
+ if (outOfMemory.get()) {
+ if (bufferQueue.size() < 10) {
+ outOfMemory.set(false);
+ }
+ }
+
+ RawFragmentBatch b;
+ try {
+ b = bufferQueue.poll();
+
+ // if we didn't get a batch, block on waiting for queue.
+ if (b == null && (!isTerminated() || !bufferQueue.isEmpty())) {
+ b = bufferQueue.take();
+ }
+ } catch (final InterruptedException e) {
+
+ // We expect that the interrupt means the fragment is canceled or failed, so we should kill this buffer
+ if (!context.shouldContinue()) {
+ kill(context);
+ } else {
+ throw new DrillRuntimeException("Interrupted but context.shouldContinue() is true", e);
+ }
+ // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
+ // interruption and respond to it if it wants to.
+ Thread.currentThread().interrupt();
+
+ return null;
+ }
+
+ if (b != null) {
+ if (b.getHeader().getIsOutOfMemory()) {
+ outOfMemory.set(true);
+ return b;
+ }
+
+ upkeep(b);
+
+ if (b.getHeader().getIsLastBatch()) {
+ logger.debug("Got last batch from {}:{}", b.getHeader().getSendingMajorFragmentId(), b.getHeader()
+ .getSendingMinorFragmentId());
+ final int remainingStreams = decrementStreamCounter();
+ if (remainingStreams == 0) {
+ logger.debug("Streams finished");
+ allStreamsFinished();
+ }
+ }
+ } else {
+ if (!bufferQueue.isEmpty()) {
+ throw new IllegalStateException("Returning null when there are batches left in queue");
+ }
+ if (!isTerminated()) {
+ throw new IllegalStateException("Returning null when not finished");
+ }
+ }
+
+ assertAckSent(b);
+ return b;
+
+ }
+
+ private void assertAckSent(RawFragmentBatch batch) {
+ assert batch == null || batch.isAckSent() || batch.getHeader().getIsOutOfMemory() : "Ack not sent for batch";
+ }
+
+ private int decrementStreamCounter() {
+ streamCounter--;
+ return streamCounter;
+ }
+
+ private int completedStreams() {
+ return fragmentCount - streamCounter;
+ }
+
+ /**
+ * Handle miscellaneous tasks after batch retrieval
+ */
+ protected abstract void upkeep(RawFragmentBatch batch);
+
+ protected boolean isTerminated() {
+ return (state == BufferState.KILLED || state == BufferState.STREAMS_FINISHED);
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
index 8646a72..0441eca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/RawBatchBuffer.java
@@ -38,9 +38,4 @@ public interface RawBatchBuffer extends RawFragmentBatchProvider {
* @returns Whether response should be returned.
*/
public void enqueue(RawFragmentBatch batch) throws IOException;
-
- /**
- * Inform the buffer that no more batches are expected.
- */
- public void finished();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
index 07a3505..1634982 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/SpoolingRawBatchBuffer.java
@@ -20,31 +20,32 @@ package org.apache.drill.exec.work.batch;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.DrillBuf;
+import java.io.EOFException;
import java.io.IOException;
import java.util.List;
-import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData;
+import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.proto.ExecProtos;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.record.RawFragmentBatch;
import org.apache.drill.exec.store.LocalSyncableFileSystem;
-import org.apache.drill.exec.work.fragment.FragmentManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.common.collect.Queues;
@@ -53,7 +54,7 @@ import com.google.common.collect.Queues;
* This implementation of RawBatchBuffer starts writing incoming batches to disk once the buffer size reaches a threshold.
* The order of the incoming buffers is maintained.
*/
-public class SpoolingRawBatchBuffer implements RawBatchBuffer {
+public class SpoolingRawBatchBuffer extends BaseRawBatchBuffer<SpoolingRawBatchBuffer.RawFragmentBatchWrapper> {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(SpoolingRawBatchBuffer.class);
private static String DRILL_LOCAL_IMPL_STRING = "fs.drill-local.impl";
@@ -61,183 +62,305 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
public static final long ALLOCATOR_INITIAL_RESERVATION = 1*1024*1024;
public static final long ALLOCATOR_MAX_RESERVATION = 20L*1000*1000*1000;
- private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
- private volatile boolean finished = false;
- private volatile long queueSize = 0;
- private long threshold;
- private FragmentContext context;
- private BufferAllocator allocator;
- private volatile AtomicBoolean spooling = new AtomicBoolean(false);
+ private enum SpoolingState {
+ NOT_SPOOLING,
+ SPOOLING,
+ PAUSE_SPOOLING,
+ STOP_SPOOLING
+ }
+
+ private final BufferAllocator allocator;
+ private final long threshold;
+ private final int oppositeId;
+ private final int bufferIndex;
+
+ private volatile SpoolingState spoolingState;
+ private volatile long currentSizeInMemory = 0;
+ private volatile Spooler spooler;
+
private FileSystem fs;
private Path path;
private FSDataOutputStream outputStream;
- private FSDataInputStream inputStream;
- private boolean outOfMemory = false;
- private boolean closed = false;
- private FragmentManager fragmentManager;
- public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount) throws IOException, OutOfMemoryException {
- this.context = context;
+ public SpoolingRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId, int bufferIndex) throws IOException, OutOfMemoryException {
+ super(context, fragmentCount);
this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION, true);
this.threshold = context.getConfig().getLong(ExecConstants.SPOOLING_BUFFER_MEMORY);
- Configuration conf = new Configuration();
- conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
- conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
- this.fs = FileSystem.get(conf);
- this.path = new Path(getDir(), getFileName());
+ this.oppositeId = oppositeId;
+ this.bufferIndex = bufferIndex;
+ this.bufferQueue = new SpoolingBufferQueue();
}
- public static List<String> DIRS = DrillConfig.create().getStringList(ExecConstants.TEMP_DIRECTORIES);
+ private class SpoolingBufferQueue implements BufferQueue<RawFragmentBatchWrapper> {
- public static String getDir() {
- Random random = new Random();
- return DIRS.get(random.nextInt(DIRS.size()));
- }
+ private final LinkedBlockingDeque<RawFragmentBatchWrapper> buffer = Queues.newLinkedBlockingDeque();
- @Override
- public synchronized void enqueue(RawFragmentBatch batch) throws IOException {
- if (batch.getHeader().getIsOutOfMemory()) {
- if (fragmentManager == null) {
- throw new UnsupportedOperationException("Need to fix.");
-// fragmentManager = ((BitServerConnection) batch.getConnection()).getFragmentManager();
- }
-// fragmentManager.setAutoRead(false);
-// logger.debug("Setting autoRead false");
- if (!outOfMemory && !buffer.peekFirst().isOutOfMemory()) {
- logger.debug("Adding OOM message to front of queue. Current queue size: {}", buffer.size());
- buffer.addFirst(new RawFragmentBatchWrapper(batch, true));
- } else {
- logger.debug("ignoring duplicate OOM message");
- }
- batch.sendOk();
- return;
+ @Override
+ public void addOomBatch(RawFragmentBatch batch) {
+ RawFragmentBatchWrapper batchWrapper = new RawFragmentBatchWrapper(batch, true);
+ batchWrapper.setOutOfMemory(true);
+ buffer.addFirst(batchWrapper);
}
- RawFragmentBatchWrapper wrapper;
- boolean spool = spooling.get();
- wrapper = new RawFragmentBatchWrapper(batch, !spool);
- queueSize += wrapper.getBodySize();
- if (spool) {
- if (outputStream == null) {
- outputStream = fs.create(path);
+
+ @Override
+ public RawFragmentBatch poll() throws IOException {
+ RawFragmentBatchWrapper batchWrapper = buffer.poll();
+ if (batchWrapper != null) {
+ try {
+ return batchWrapper.get();
+ } catch (InterruptedException e) {
+ return null;
+ }
}
- wrapper.writeToStream(outputStream);
+ return null;
+ }
+
+ @Override
+ public RawFragmentBatch take() throws IOException, InterruptedException {
+ return buffer.take().get();
+ }
+
+ @Override
+ public boolean checkForOutOfMemory() {
+ return buffer.peek().isOutOfMemory();
+ }
+
+ @Override
+ public int size() {
+ return buffer.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return buffer.size() == 0;
}
- buffer.add(wrapper);
- if (!spool && queueSize > threshold) {
- logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", queueSize, threshold);
- spooling.set(true);
+
+ public void add(RawFragmentBatchWrapper batchWrapper) {
+ buffer.add(batchWrapper);
}
}
- @Override
- public void kill(FragmentContext context) {
- allocator.close();
+ private synchronized void setSpoolingState(SpoolingState newState) {
+ SpoolingState currentState = spoolingState;
+ if (newState == SpoolingState.NOT_SPOOLING ||
+ currentState == SpoolingState.STOP_SPOOLING) {
+ return;
+ }
+ spoolingState = newState;
}
+ private boolean isCurrentlySpooling() {
+ return spoolingState == SpoolingState.SPOOLING;
+ }
- @Override
- public void finished() {
- finished = true;
+ private void startSpooling() {
+ setSpoolingState(SpoolingState.SPOOLING);
}
- @Override
- public RawFragmentBatch getNext() throws IOException, InterruptedException {
- if (outOfMemory && buffer.size() < 10) {
- outOfMemory = false;
- fragmentManager.setAutoRead(true);
- logger.debug("Setting autoRead true");
+ private void pauseSpooling() {
+ setSpoolingState(SpoolingState.PAUSE_SPOOLING);
+ }
+
+ private boolean isSpoolingStopped() {
+ return spoolingState == SpoolingState.STOP_SPOOLING;
+ }
+
+ private void stopSpooling() {
+ setSpoolingState(SpoolingState.STOP_SPOOLING);
+ }
+
+ public String getDir() {
+ List<String> dirs = context.getConfig().getStringList(ExecConstants.TEMP_DIRECTORIES);
+ return dirs.get(ThreadLocalRandom.current().nextInt(dirs.size()));
+ }
+
+ private synchronized void initSpooler() throws IOException {
+ if (spooler != null) {
+ return;
}
- boolean spool = spooling.get();
- RawFragmentBatchWrapper w = buffer.poll();
- RawFragmentBatch batch;
- if(w == null && !finished){
- try {
- w = buffer.take();
- batch = w.get();
- if (batch.getHeader().getIsOutOfMemory()) {
- outOfMemory = true;
- return batch;
- }
- queueSize -= w.getBodySize();
- return batch;
- } catch (final InterruptedException e) {
- // Preserve evidence that the interruption occurred so that code higher up on the call stack can learn of the
- // interruption and respond to it if it wants to.
- Thread.currentThread().interrupt();
- return null;
+ Configuration conf = new Configuration();
+ conf.set(FileSystem.FS_DEFAULT_NAME_KEY, context.getConfig().getString(ExecConstants.TEMP_FILESYSTEM));
+ conf.set(DRILL_LOCAL_IMPL_STRING, LocalSyncableFileSystem.class.getName());
+ fs = FileSystem.get(conf);
+ path = getPath();
+ outputStream = fs.create(path);
+ final String spoolingThreadName = QueryIdHelper.getExecutorThreadName(context.getHandle()).concat(
+ ":Spooler-" + oppositeId + "-" + bufferIndex);
+ spooler = new Spooler(spoolingThreadName);
+ spooler.start();
+ }
+
+ @Override
+ protected void enqueueInner(RawFragmentBatch batch) throws IOException {
+ assert batch.getHeader().getSendingMajorFragmentId() == oppositeId;
+
+ logger.debug("Enqueue batch. Current buffer size: {}. Last batch: {}. Sending fragment: {}", bufferQueue.size(), batch.getHeader().getIsLastBatch(), batch.getHeader().getSendingMajorFragmentId());
+ RawFragmentBatchWrapper wrapper;
+
+ boolean spoolCurrentBatch = isCurrentlySpooling();
+ wrapper = new RawFragmentBatchWrapper(batch, !spoolCurrentBatch);
+ currentSizeInMemory += wrapper.getBodySize();
+ if (spoolCurrentBatch) {
+ if (spooler == null) {
+ initSpooler();
}
+ spooler.addBatchForSpooling(wrapper);
}
- if (w == null) {
- return null;
+ bufferQueue.add(wrapper);
+ if (!spoolCurrentBatch && currentSizeInMemory > threshold) {
+ logger.debug("Buffer size {} greater than threshold {}. Start spooling to disk", currentSizeInMemory, threshold);
+ startSpooling();
}
+ }
- batch = w.get();
- if (batch.getHeader().getIsOutOfMemory()) {
- outOfMemory = true;
- return batch;
+ @Override
+ public void kill(FragmentContext context) {
+ allocator.close();
+ if (spooler != null) {
+ spooler.terminate();
+ }
+ }
+
+ @Override
+ protected void upkeep(RawFragmentBatch batch) {
+ FragmentRecordBatch header = batch.getHeader();
+ if (header.getIsOutOfMemory()) {
+ outOfMemory.set(true);
+ return;
+ }
+ DrillBuf body = batch.getBody();
+ if (body != null) {
+ currentSizeInMemory -= body.capacity();
}
- queueSize -= w.getBodySize();
-// assert queueSize >= 0;
- if (spool && queueSize < threshold * STOP_SPOOLING_FRACTION) {
- logger.debug("buffer size {} less than {}x threshold. Stop spooling.", queueSize, STOP_SPOOLING_FRACTION);
- spooling.set(false);
+ if (isCurrentlySpooling() && currentSizeInMemory < threshold * STOP_SPOOLING_FRACTION) {
+ logger.debug("buffer size {} less than {}x threshold. Stop spooling.", currentSizeInMemory, STOP_SPOOLING_FRACTION);
+ pauseSpooling();
}
- return batch;
+ logger.debug("Got batch. Current buffer size: {}", bufferQueue.size());
}
public void cleanup() {
- if (closed) {
- logger.warn("Tried cleanup twice");
- return;
+ if (spooler != null) {
+ spooler.terminate();
+ while (spooler.isAlive()) {
+ try {
+ spooler.join();
+ } catch (InterruptedException e) {
+ logger.warn("Interrupted while waiting for spooling thread to exit");
+ continue;
+ }
+ }
}
- closed = true;
allocator.close();
try {
if (outputStream != null) {
outputStream.close();
}
- if (inputStream != null) {
- inputStream.close();
- }
} catch (IOException e) {
logger.warn("Failed to cleanup I/O streams", e);
}
if (context.getConfig().getBoolean(ExecConstants.SPOOLING_BUFFER_DELETE)) {
try {
- fs.delete(path,false);
+ if (fs != null) {
+ fs.delete(path, false);
+ logger.debug("Deleted file {}", path.toString());
+ }
} catch (IOException e) {
logger.warn("Failed to delete temporary files", e);
}
- logger.debug("Deleted file {}", path.toString());
}
+ super.cleanup();
}
- private class RawFragmentBatchWrapper {
+ private class Spooler extends Thread {
+
+ private final LinkedBlockingDeque<RawFragmentBatchWrapper> spoolingQueue;
+ private volatile boolean shouldContinue = true;
+ private Thread spoolingThread;
+
+ public Spooler(String name) {
+ setDaemon(true);
+ setName(name);
+ spoolingQueue = Queues.newLinkedBlockingDeque();
+ }
+
+ public void run() {
+ try {
+ while (shouldContinue) {
+ RawFragmentBatchWrapper batch;
+ try {
+ batch = spoolingQueue.take();
+ } catch (InterruptedException e) {
+ if (shouldContinue) {
+ continue;
+ } else {
+ break;
+ }
+ }
+ try {
+ batch.writeToStream(outputStream);
+ } catch (IOException e) {
+ context.fail(e);
+ }
+ }
+ } catch (Throwable e) {
+ context.fail(e);
+ } finally {
+ logger.info("Spooler thread exiting");
+ }
+ }
+
+ public void addBatchForSpooling(RawFragmentBatchWrapper batchWrapper) {
+ if (isSpoolingStopped()) {
+ spoolingQueue.add(batchWrapper);
+ } else {
+ // will not spill this batch
+ batchWrapper.available = true;
+ batchWrapper.batch.sendOk();
+ batchWrapper.latch.countDown();
+ }
+ }
+
+ public void terminate() {
+ stopSpooling();
+ shouldContinue = false;
+ if (spoolingThread.isAlive()) {
+ spoolingThread.interrupt();
+ }
+ }
+ }
+
+ class RawFragmentBatchWrapper {
private RawFragmentBatch batch;
- private boolean available;
- private CountDownLatch latch = new CountDownLatch(1);
- private int bodyLength;
- private boolean outOfMemory = false;
+ private volatile boolean available;
+ private CountDownLatch latch;
+ private volatile int bodyLength;
+ private volatile boolean outOfMemory = false;
+ private long start = -1;
+ private long check;
public RawFragmentBatchWrapper(RawFragmentBatch batch, boolean available) {
Preconditions.checkNotNull(batch);
this.batch = batch;
this.available = available;
+ this.latch = new CountDownLatch(available ? 0 : 1);
+ if (available) {
+ batch.sendOk();
+ }
}
public boolean isNull() {
return batch == null;
}
- public RawFragmentBatch get() throws IOException {
+ public RawFragmentBatch get() throws InterruptedException, IOException {
if (available) {
+ assert batch.getHeader() != null : "batch header null";
return batch;
} else {
- if (inputStream == null) {
- inputStream = fs.open(path);
- }
- readFromStream(inputStream);
+ latch.await();
+ readFromStream();
available = true;
return batch;
}
@@ -255,32 +378,81 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
Stopwatch watch = new Stopwatch();
watch.start();
available = false;
+ check = ThreadLocalRandom.current().nextLong();
+ start = stream.getPos();
+ logger.debug("Writing check value {} at position {}", check, start);
+ stream.writeLong(check);
batch.getHeader().writeDelimitedTo(stream);
ByteBuf buf = batch.getBody();
- if (buf == null) {
+ if (buf != null) {
+ bodyLength = buf.capacity();
+ } else {
bodyLength = 0;
- return;
}
- bodyLength = buf.readableBytes();
- buf.getBytes(0, stream, bodyLength);
- stream.sync();
+ if (bodyLength > 0) {
+ buf.getBytes(0, stream, bodyLength);
+ }
+ stream.hsync();
+ FileStatus status = fs.getFileStatus(path);
+ long len = status.getLen();
+ logger.debug("After spooling batch, stream at position {}. File length {}", stream.getPos(), len);
+ batch.sendOk();
+ latch.countDown();
long t = watch.elapsed(TimeUnit.MICROSECONDS);
logger.debug("Took {} us to spool {} to disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
- buf.release();
+ if (buf != null) {
+ buf.release();
+ }
}
- public void readFromStream(FSDataInputStream stream) throws IOException {
- Stopwatch watch = new Stopwatch();
- watch.start();
- BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom(stream);
- DrillBuf buf = allocator.buffer(bodyLength);
- buf.writeBytes(stream, bodyLength);
- batch = new RawFragmentBatch(header, buf, null);
- buf.release();
- available = true;
- latch.countDown();
- long t = watch.elapsed(TimeUnit.MICROSECONDS);
- logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+ public void readFromStream() throws IOException, InterruptedException {
+ long pos = start;
+ boolean tryAgain = true;
+ int duration = 0;
+
+ while (tryAgain) {
+
+ // Sometimes, the file isn't quite done writing when we attempt to read it. As such, we need to wait and retry.
+ Thread.sleep(duration);
+
+ try(final FSDataInputStream stream = fs.open(path);
+ final DrillBuf buf = allocator.buffer(bodyLength)) {
+ stream.seek(start);
+ final long currentPos = stream.getPos();
+ final long check = stream.readLong();
+ pos = stream.getPos();
+ assert check == this.check : String.format("Check values don't match: %d %d, Position %d", this.check, check, currentPos);
+ Stopwatch watch = new Stopwatch();
+ watch.start();
+ BitData.FragmentRecordBatch header = BitData.FragmentRecordBatch.parseDelimitedFrom(stream);
+ pos = stream.getPos();
+ assert header != null : "header null after parsing from stream";
+ buf.writeBytes(stream, bodyLength);
+ pos = stream.getPos();
+ batch = new RawFragmentBatch(header, buf, null);
+ available = true;
+ latch.countDown();
+ long t = watch.elapsed(TimeUnit.MICROSECONDS);
+ logger.debug("Took {} us to read {} from disk. Rate {} mb/s", t, bodyLength, bodyLength / t);
+ tryAgain = false;
+ } catch (EOFException e) {
+ FileStatus status = fs.getFileStatus(path);
+ logger.warn("EOF reading from file {} at pos {}. Current file size: {}", path, pos, status.getLen());
+ duration = Math.max(1, duration * 2);
+ if (duration < 60000) {
+ continue;
+ } else {
+ throw e;
+ }
+ } finally {
+ if (tryAgain) {
+ // we had a premature exit, release batch memory so we don't leak it.
+ if (batch != null) {
+ batch.getBody().release();
+ }
+ }
+ }
+ }
}
private boolean isOutOfMemory() {
@@ -292,7 +464,7 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
}
}
- private String getFileName() {
+ private Path getPath() {
ExecProtos.FragmentHandle handle = context.getHandle();
String qid = QueryIdHelper.getQueryId(handle.getQueryId());
@@ -300,8 +472,8 @@ public class SpoolingRawBatchBuffer implements RawBatchBuffer {
int majorFragmentId = handle.getMajorFragmentId();
int minorFragmentId = handle.getMinorFragmentId();
- String fileName = String.format("%s_%s_%s", qid, majorFragmentId, minorFragmentId);
+ String fileName = Joiner.on(Path.SEPARATOR).join(getDir(), qid, majorFragmentId, minorFragmentId, oppositeId, bufferIndex);
- return fileName;
+ return new Path(fileName);
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
index 4750666..ef06ea8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/batch/UnlimitedRawBatchBuffer.java
@@ -19,206 +19,78 @@ package org.apache.drill.exec.work.batch;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingDeque;
-import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.drill.exec.ExecConstants;
import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
import org.apache.drill.exec.record.RawFragmentBatch;
-import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Queues;
-public class UnlimitedRawBatchBuffer implements RawBatchBuffer{
+public class UnlimitedRawBatchBuffer extends BaseRawBatchBuffer<RawFragmentBatch> {
private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(UnlimitedRawBatchBuffer.class);
- private static enum BufferState {
- INIT,
- FINISHED,
- KILLED
- }
-
- private final LinkedBlockingDeque<RawFragmentBatch> buffer;
- private volatile BufferState state = BufferState.INIT;
private final int softlimit;
private final int startlimit;
- private final int bufferSizePerSocket;
- private final AtomicBoolean overlimit = new AtomicBoolean(false);
- private final AtomicBoolean outOfMemory = new AtomicBoolean(false);
- private final ResponseSenderQueue readController = new ResponseSenderQueue();
- private int streamCounter;
- private final int fragmentCount;
- private final FragmentContext context;
-
- public UnlimitedRawBatchBuffer(final FragmentContext context, final int fragmentCount) {
- bufferSizePerSocket = context.getConfig().getInt(ExecConstants.INCOMING_BUFFER_SIZE);
+ public UnlimitedRawBatchBuffer(FragmentContext context, int fragmentCount, int oppositeId) {
+ super(context, fragmentCount);
this.softlimit = bufferSizePerSocket * fragmentCount;
this.startlimit = Math.max(softlimit/2, 1);
logger.trace("softLimit: {}, startLimit: {}", softlimit, startlimit);
- this.buffer = Queues.newLinkedBlockingDeque();
- this.fragmentCount = fragmentCount;
- this.streamCounter = fragmentCount;
- this.context = context;
+ this.bufferQueue = new UnlimitedBufferQueue();
}
- @Override
- public void enqueue(final RawFragmentBatch batch) throws IOException {
+ private class UnlimitedBufferQueue implements BufferQueue<RawFragmentBatch> {
+ private final LinkedBlockingDeque<RawFragmentBatch> buffer = Queues.newLinkedBlockingDeque();;
- // if this fragment is already canceled or failed, we shouldn't need any or more stuff. We do the null check to
- // ensure that tests run.
- if (context != null && !context.shouldContinue()) {
- this.kill(context);
+ @Override
+ public void addOomBatch(RawFragmentBatch batch) {
+ buffer.addFirst(batch);
}
- if (isFinished()) {
- if (state == BufferState.KILLED) {
- // do not even enqueue just release and send ack back
- batch.release();
+ @Override
+ public RawFragmentBatch poll() throws IOException {
+ RawFragmentBatch batch = buffer.poll();
+ if (batch != null) {
batch.sendOk();
- return;
- } else {
- throw new IOException("Attempted to enqueue batch after finished");
- }
- }
- if (batch.getHeader().getIsOutOfMemory()) {
- logger.trace("Setting autoread false");
- final RawFragmentBatch firstBatch = buffer.peekFirst();
- final FragmentRecordBatch header = firstBatch == null ? null :firstBatch.getHeader();
- if (!outOfMemory.get() && !(header == null) && header.getIsOutOfMemory()) {
- buffer.addFirst(batch);
}
- outOfMemory.set(true);
- return;
+ return batch;
}
- buffer.add(batch);
- if (buffer.size() >= softlimit) {
- logger.trace("buffer.size: {}", buffer.size());
- overlimit.set(true);
- readController.enqueueResponse(batch.getSender());
- } else {
+
+ @Override
+ public RawFragmentBatch take() throws IOException, InterruptedException {
+ RawFragmentBatch batch = buffer.take();
batch.sendOk();
+ return batch;
}
- }
- @Override
- public void cleanup() {
- if (!isFinished() && context.shouldContinue()) {
- final String msg = String.format("Cleanup before finished. " + (fragmentCount - streamCounter) + " out of " + fragmentCount + " streams have finished.");
- final IllegalStateException e = new IllegalStateException(msg);
- throw e;
+ @Override
+ public boolean checkForOutOfMemory() {
+ return buffer.peekFirst().getHeader().getIsOutOfMemory();
}
- if (!buffer.isEmpty()) {
- if (context.shouldContinue()) {
- context.fail(new IllegalStateException("Batches still in queue during cleanup"));
- logger.error("{} Batches in queue.", buffer.size());
- }
- clearBufferWithBody();
+ @Override
+ public int size() {
+ return buffer.size();
}
- }
- @Override
- public void kill(final FragmentContext context) {
- state = BufferState.KILLED;
- clearBufferWithBody();
- }
-
- /**
- * Helper method to clear buffer with request bodies release
- * also flushes ack queue - in case there are still responses pending
- */
- private void clearBufferWithBody() {
- while (!buffer.isEmpty()) {
- final RawFragmentBatch batch = buffer.poll();
- if (batch.getBody() != null) {
- batch.getBody().release();
- }
+ @Override
+ public boolean isEmpty() {
+ return buffer.size() == 0;
}
- readController.flushResponses();
- }
- @Override
- public void finished() {
- if (state != BufferState.KILLED) {
- state = BufferState.FINISHED;
- }
- if (!buffer.isEmpty()) {
- throw new IllegalStateException("buffer not empty when finished");
+ @Override
+ public void add(RawFragmentBatch batch) {
+ buffer.add(batch);
}
}
- @Override
- public RawFragmentBatch getNext() throws IOException, InterruptedException {
-
- if (outOfMemory.get() && buffer.size() < 10) {
- logger.trace("Setting autoread true");
- outOfMemory.set(false);
- readController.flushResponses();
- }
-
- RawFragmentBatch b = null;
-
- b = buffer.poll();
-
- // if we didn't get a buffer, block on waiting for buffer.
- if (b == null && (!isFinished() || !buffer.isEmpty())) {
- try {
- b = buffer.take();
- } catch (final InterruptedException e) {
- logger.debug("Interrupted while waiting for incoming data.", e);
- throw e;
- }
- }
-
- if (b != null && b.getHeader().getIsOutOfMemory()) {
- outOfMemory.set(true);
- return b;
- }
-
-
- // try to flush the difference between softlimit and queue size, so every flush we are reducing backlog
- // when queue size is lower then softlimit - the bigger the difference the more we can flush
- if (!isFinished() && overlimit.get()) {
- final int flushCount = softlimit - buffer.size();
- if ( flushCount > 0 ) {
- final int flushed = readController.flushResponses(flushCount);
- logger.trace("flush {} entries, flushed {} entries ", flushCount, flushed);
- if ( flushed == 0 ) {
- // queue is empty - nothing to do for now
- overlimit.set(false);
- }
- }
- }
-
- if (b != null && b.getHeader().getIsLastBatch()) {
- streamCounter--;
- if (streamCounter == 0) {
- finished();
- }
- }
-
- if (b == null && buffer.size() > 0) {
- throw new IllegalStateException("Returning null when there are batches left in queue");
- }
- if (b == null && !isFinished()) {
- throw new IllegalStateException("Returning null when not finished");
+ protected void enqueueInner(final RawFragmentBatch batch) throws IOException {
+ if (bufferQueue.size() < softlimit) {
+ batch.sendOk();
}
- return b;
-
- }
-
- private boolean isFinished() {
- return (state == BufferState.KILLED || state == BufferState.FINISHED);
- }
-
- @VisibleForTesting
- ResponseSenderQueue getReadController() {
- return readController;
+ bufferQueue.add(batch);
}
- @VisibleForTesting
- boolean isBufferEmpty() {
- return buffer.isEmpty();
+ protected void upkeep(RawFragmentBatch batch) {
}
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/main/resources/drill-module.conf
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/resources/drill-module.conf b/exec/java-exec/src/main/resources/drill-module.conf
index 7630938..6fb9340 100644
--- a/exec/java-exec/src/main/resources/drill-module.conf
+++ b/exec/java-exec/src/main/resources/drill-module.conf
@@ -122,10 +122,9 @@ drill.exec: {
filesystem: "drill-local:///"
},
buffer:{
- impl: "org.apache.drill.exec.work.batch.UnlimitedRawBatchBuffer",
size: "6",
spooling: {
- delete: false,
+ delete: true,
size: 100000000
}
},
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
index dcea9bb..271f29f 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestSpoolingBuffer.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
import java.util.List;
+import org.apache.drill.BaseTestQuery;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.util.FileUtils;
import org.apache.drill.exec.ExecTest;
@@ -33,7 +34,7 @@ import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-public class TestSpoolingBuffer extends ExecTest {
+public class TestSpoolingBuffer extends BaseTestQuery {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(TestSpoolingBuffer.class);
@Test
@@ -59,5 +60,4 @@ public class TestSpoolingBuffer extends ExecTest {
assertEquals(500024, count);
}
}
-
}
http://git-wip-us.apache.org/repos/asf/drill/blob/814f553f/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java b/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
deleted file mode 100644
index b8336e9..0000000
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/work/batch/TestUnlimitedBatchBuffer.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.drill.exec.work.batch;
-
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-import org.apache.drill.common.config.DrillConfig;
-import org.apache.drill.exec.ExecConstants;
-import org.apache.drill.exec.ExecTest;
-import org.apache.drill.exec.ops.FragmentContext;
-import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
-import org.apache.drill.exec.record.RawFragmentBatch;
-import org.apache.drill.exec.rpc.data.AckSender;
-import org.junit.Before;
-import org.junit.Test;
-import org.mockito.Mockito;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-/**
- * Test case to test whether backpressure is applied when
- * size of the queue of RawBatchBuffers is exceeding specified softLimit.
- * It is testing that acknowledgments are queued and sent according to the
- * correct schedule
- * If algorithm to release acks will be changed in the future
- * this test will need to be changed
- * It is not testing whether Senders receive acknowledgments and act accordingly
- */
-public class TestUnlimitedBatchBuffer extends ExecTest {
-
- private static int FRAGMENT_COUNT = 5;
- private DrillConfig dc = DrillConfig.create();
- private MyAckSender myAckSender;
- private UnlimitedRawBatchBuffer rawBuffer;
- private RawFragmentBatch batch;
- private FragmentContext context;
- private int softLimit;
-
- private static class MyAckSender extends AckSender {
-
- private int sendCount = 0;
-
- public MyAckSender() {
- super(null);
- }
-
- @Override
- public void sendOk() {
- sendCount++;
- }
-
- public int getSendCount() {
- return sendCount;
- }
-
- public void resetSender() {
- sendCount = 0;
- }
- }
-
- @Before
- public void setUp() {
- myAckSender = new MyAckSender();
- context = Mockito.mock(FragmentContext.class);
-
- Mockito.when(context.getConfig()).thenReturn(dc);
- Mockito.when(context.shouldContinue()).thenReturn(true);
-
- rawBuffer = new UnlimitedRawBatchBuffer(context, FRAGMENT_COUNT);
-
- batch = Mockito.mock(RawFragmentBatch.class);
-
- Mockito.when(batch.getSender()).thenReturn(myAckSender);
- Mockito.doAnswer(new Answer<Void>() {
- public Void answer(InvocationOnMock ignore) throws Throwable {
- myAckSender.sendOk();
- return null;
- }
- }).when(batch).sendOk();
-
- FragmentRecordBatch header = FragmentRecordBatch.newBuilder().setIsOutOfMemory(false).setIsLastBatch(false).build();
- Mockito.when(batch.getHeader()).thenReturn(header);
-
- /// start the real test
- int incomingBufferSize = dc.getInt(ExecConstants.INCOMING_BUFFER_SIZE);
- softLimit = incomingBufferSize * FRAGMENT_COUNT;
- }
-
- @Test
- public void testBackPressure() throws Exception {
- // No back pressure should be kicked in
- for ( int i = 0; i < softLimit-1; i++) {
- rawBuffer.enqueue(batch);
- }
-
- // number of responses sent == number of enqueued elements
- assertEquals(softLimit - 1, myAckSender.getSendCount());
- rawBuffer.getNext();
-
- // set senderCount to 0
- myAckSender.resetSender();
-
- // test back pressure
- // number of elements in the queue = softLimit -2
- // enqueue softlimit elements more
- for ( int i = 0; i < softLimit; i++) {
- rawBuffer.enqueue(batch);
- }
- // we are exceeding softlimit, so senderCount should not increase
- assertEquals(1, myAckSender.getSendCount());
-
- // other responses should be saved in the responsequeue
- for (int i = 0; i < softLimit-2; i++ ) {
- rawBuffer.getNext();
- }
-
- // still should not send responses, as queue.size should higher then softLimit
- assertEquals(1, myAckSender.getSendCount());
-
- // size of the queue == softLimit now
- for (int i = softLimit; i > 0 ; i-- ) {
- int senderCount = myAckSender.getSendCount();
- rawBuffer.getNext();
- int expectedCountNumber = softLimit - i + senderCount+1;
- assertEquals((expectedCountNumber < softLimit ? expectedCountNumber : softLimit), myAckSender.getSendCount());
- }
- }
-
- @Test
- public void testAcksWithKill() throws Exception {
- // Back pressure should be kicked in
- for ( int i = 0; i < 2*softLimit; i++) {
- rawBuffer.enqueue(batch);
- }
- assertEquals(softLimit - 1, myAckSender.getSendCount());
- assertTrue(!rawBuffer.getReadController().isEmpty());
-
- rawBuffer.kill(context);
-
- // UnlimitedBatchBuffer queue should be cleared
- assertTrue(rawBuffer.isBufferEmpty());
-
- // acks queue should be cleared as well
- assertTrue(rawBuffer.getReadController().isEmpty());
-
- // all acks should be sent
- assertEquals(2*softLimit, myAckSender.getSendCount());
- }
-}