You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/06 04:52:32 UTC
[15/23] git commit: DRILL-892: Send Batch is leaking memory when send
fails to establish connection to remote fragment.
DRILL-892: Send Batch is leaking memory when send fails to establish connection to remote fragment.
Also:
1. Maitain one StatusHandler for all OutgoingRecordBatches in Partitioner.
2. In FragmentExecutor check for failures set in FragementContext.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e62c3650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e62c3650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e62c3650
Branch: refs/heads/master
Commit: e62c3650d2c882bd2cf354d7a0dbc506a58fc051
Parents: c6c3cd5
Author: vkorukanti <ve...@gmail.com>
Authored: Mon Jun 2 17:43:57 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 5 09:35:08 2014 -0700
----------------------------------------------------------------------
.../partitionsender/PartitionSenderRootExec.java | 9 ++++++++-
.../physical/impl/partitionsender/Partitioner.java | 3 ++-
.../impl/partitionsender/PartitionerTemplate.java | 16 ++++++++--------
.../org/apache/drill/exec/rpc/data/DataTunnel.java | 11 +++++++++--
.../drill/exec/work/fragment/FragmentExecutor.java | 16 ++++++++++++----
5 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 74a3c90..ffb3780 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -61,6 +61,7 @@ public class PartitionSenderRootExec implements RootExec {
private final OperatorStats stats;
private final int outGoingBatchCount;
private final HashPartitionSender popConfig;
+ private final StatusHandler statusHandler;
public PartitionSenderRootExec(FragmentContext context,
@@ -74,6 +75,7 @@ public class PartitionSenderRootExec implements RootExec {
this.stats = oContext.getStats();
this.outGoingBatchCount = operator.getDestinations().size();
this.popConfig = operator;
+ this.statusHandler = new StatusHandler(sendCount, context);
}
@Override
@@ -183,7 +185,7 @@ public class PartitionSenderRootExec implements RootExec {
// compile and setup generated code
// partitioner = context.getImplementationClassMultipleOutput(cg);
partitioner = context.getImplementationClass(cg);
- partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext);
+ partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext, statusHandler);
} catch (ClassTransformationException | IOException e) {
throw new SchemaChangeException("Failure while attempting to load generated class", e);
@@ -197,6 +199,11 @@ public class PartitionSenderRootExec implements RootExec {
partitioner.clear();
}
sendCount.waitForSendComplete();
+
+ if (!statusHandler.isOk()) {
+ context.fail(statusHandler.getException());
+ }
+
oContext.close();
incoming.cleanup();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 8d6c19a..6958403 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -35,7 +35,8 @@ public interface Partitioner {
HashPartitionSender popConfig,
OperatorStats stats,
SendingAccountor sendingAccountor,
- OperatorContext oContext) throws SchemaChangeException;
+ OperatorContext oContext,
+ StatusHandler statusHandler) throws SchemaChangeException;
public abstract void partitionBatch(RecordBatch incoming) throws IOException;
public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException;
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 4a27262..510327a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -71,7 +71,8 @@ public abstract class PartitionerTemplate implements Partitioner {
HashPartitionSender popConfig,
OperatorStats stats,
SendingAccountor sendingAccountor,
- OperatorContext oContext) throws SchemaChangeException {
+ OperatorContext oContext,
+ StatusHandler statusHandler) throws SchemaChangeException {
this.incoming = incoming;
doSetup(context, incoming, null);
@@ -79,7 +80,8 @@ public abstract class PartitionerTemplate implements Partitioner {
int fieldId = 0;
for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
- outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId));
+ outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
+ context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId, statusHandler));
fieldId++;
}
@@ -204,10 +206,11 @@ public abstract class PartitionerTemplate implements Partitioner {
private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
- private StatusHandler statusHandler;
+ private final StatusHandler statusHandler;
public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel,
- FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
+ FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId,
+ StatusHandler statusHandler) {
this.context = context;
this.allocator = allocator;
this.operator = operator;
@@ -215,7 +218,7 @@ public abstract class PartitionerTemplate implements Partitioner {
this.sendCount = sendCount;
this.stats = stats;
this.oppositeMinorFragmentId = oppositeMinorFragmentId;
- this.statusHandler = new StatusHandler(sendCount, context);
+ this.statusHandler = statusHandler;
}
protected boolean copy(int inIndex) throws IOException {
@@ -346,9 +349,6 @@ public abstract class PartitionerTemplate implements Partitioner {
return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
}
-
-
-
public void clear(){
vectorContainer.clear();
}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 98bbeeb..3c2b9e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -17,6 +17,7 @@
*/
package org.apache.drill.exec.rpc.data;
+import io.netty.buffer.ByteBuf;
import org.apache.drill.exec.ops.FragmentContext;
import org.apache.drill.exec.proto.BitData.RpcType;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -66,8 +67,14 @@ public class DataTunnel {
public String toString() {
return "SendBatch [batch.header=" + batch.getHeader() + "]";
}
-
-
+
+ @Override
+ public void connectionFailed(FailureType type, Throwable t) {
+ for(ByteBuf buffer : batch.getBuffers()) {
+ buffer.release();
+ }
+ super.connectionFailed(type, t);
+ }
}
private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 70f5dd0..11685c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -106,6 +106,9 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
}
root.stop();
+ if(context.isFailed()) {
+ internalFail(context.getFailureCause());
+ }
closed = true;
@@ -115,10 +118,15 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
internalFail(ex);
}finally{
Thread.currentThread().setName(originalThread);
- if(!closed) try{
- context.close();
- }catch(RuntimeException e){
- logger.warn("Failure while closing context in failed state.", e);
+ if(!closed) {
+ try {
+ if(context.isFailed()) {
+ internalFail(context.getFailureCause());
+ }
+ context.close();
+ } catch (RuntimeException e) {
+ logger.warn("Failure while closing context in failed state.", e);
+ }
}
}
logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());