You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2014/06/06 04:52:32 UTC

[15/23] git commit: DRILL-892: Send Batch is leaking memory when send fails to establish connection to remote fragment.

DRILL-892: Send Batch is leaking memory when send fails to establish connection to remote fragment.

Also:
1. Maitain one StatusHandler for all OutgoingRecordBatches in Partitioner.
2. In FragmentExecutor check for failures set in FragementContext.


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/e62c3650
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/e62c3650
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/e62c3650

Branch: refs/heads/master
Commit: e62c3650d2c882bd2cf354d7a0dbc506a58fc051
Parents: c6c3cd5
Author: vkorukanti <ve...@gmail.com>
Authored: Mon Jun 2 17:43:57 2014 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu Jun 5 09:35:08 2014 -0700

----------------------------------------------------------------------
 .../partitionsender/PartitionSenderRootExec.java    |  9 ++++++++-
 .../physical/impl/partitionsender/Partitioner.java  |  3 ++-
 .../impl/partitionsender/PartitionerTemplate.java   | 16 ++++++++--------
 .../org/apache/drill/exec/rpc/data/DataTunnel.java  | 11 +++++++++--
 .../drill/exec/work/fragment/FragmentExecutor.java  | 16 ++++++++++++----
 5 files changed, 39 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
index 74a3c90..ffb3780 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionSenderRootExec.java
@@ -61,6 +61,7 @@ public class PartitionSenderRootExec implements RootExec {
   private final OperatorStats stats;
   private final int outGoingBatchCount;
   private final HashPartitionSender popConfig;
+  private final StatusHandler statusHandler;
 
 
   public PartitionSenderRootExec(FragmentContext context,
@@ -74,6 +75,7 @@ public class PartitionSenderRootExec implements RootExec {
     this.stats = oContext.getStats();
     this.outGoingBatchCount = operator.getDestinations().size();
     this.popConfig = operator;
+    this.statusHandler = new StatusHandler(sendCount, context);
   }
 
   @Override
@@ -183,7 +185,7 @@ public class PartitionSenderRootExec implements RootExec {
       // compile and setup generated code
 //      partitioner = context.getImplementationClassMultipleOutput(cg);
       partitioner = context.getImplementationClass(cg);
-      partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext);
+      partitioner.setup(context, incoming, popConfig, stats, sendCount, oContext, statusHandler);
 
     } catch (ClassTransformationException | IOException e) {
       throw new SchemaChangeException("Failure while attempting to load generated class", e);
@@ -197,6 +199,11 @@ public class PartitionSenderRootExec implements RootExec {
       partitioner.clear();
     }
     sendCount.waitForSendComplete();
+
+    if (!statusHandler.isOk()) {
+      context.fail(statusHandler.getException());
+    }
+
     oContext.close();
     incoming.cleanup();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
index 8d6c19a..6958403 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/Partitioner.java
@@ -35,7 +35,8 @@ public interface Partitioner {
                           HashPartitionSender popConfig,
                           OperatorStats stats,
                           SendingAccountor sendingAccountor,
-                          OperatorContext oContext) throws SchemaChangeException;
+                          OperatorContext oContext,
+                          StatusHandler statusHandler) throws SchemaChangeException;
 
   public abstract void partitionBatch(RecordBatch incoming) throws IOException;
   public abstract void flushOutgoingBatches(boolean isLastBatch, boolean schemaChanged) throws IOException;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
index 4a27262..510327a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/partitionsender/PartitionerTemplate.java
@@ -71,7 +71,8 @@ public abstract class PartitionerTemplate implements Partitioner {
                           HashPartitionSender popConfig,
                           OperatorStats stats,
                           SendingAccountor sendingAccountor,
-                          OperatorContext oContext) throws SchemaChangeException {
+                          OperatorContext oContext,
+                          StatusHandler statusHandler) throws SchemaChangeException {
 
     this.incoming = incoming;
     doSetup(context, incoming, null);
@@ -79,7 +80,8 @@ public abstract class PartitionerTemplate implements Partitioner {
     int fieldId = 0;
     for (DrillbitEndpoint endpoint : popConfig.getDestinations()) {
       FragmentHandle opposite = context.getHandle().toBuilder().setMajorFragmentId(popConfig.getOppositeMajorFragmentId()).setMinorFragmentId(fieldId).build();
-      outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig, context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId));
+      outgoingBatches.add(new OutgoingRecordBatch(stats, sendingAccountor, popConfig,
+          context.getDataTunnel(endpoint, opposite), context, oContext.getAllocator(), fieldId, statusHandler));
       fieldId++;
     }
 
@@ -204,10 +206,11 @@ public abstract class PartitionerTemplate implements Partitioner {
     private static final int DEFAULT_RECORD_BATCH_SIZE = 20000;
     private static final int DEFAULT_VARIABLE_WIDTH_SIZE = 200;
 
-    private StatusHandler statusHandler;
+    private final StatusHandler statusHandler;
 
     public OutgoingRecordBatch(OperatorStats stats, SendingAccountor sendCount, HashPartitionSender operator, DataTunnel tunnel,
-                               FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId) {
+                               FragmentContext context, BufferAllocator allocator, int oppositeMinorFragmentId,
+                               StatusHandler statusHandler) {
       this.context = context;
       this.allocator = allocator;
       this.operator = operator;
@@ -215,7 +218,7 @@ public abstract class PartitionerTemplate implements Partitioner {
       this.sendCount = sendCount;
       this.stats = stats;
       this.oppositeMinorFragmentId = oppositeMinorFragmentId;
-      this.statusHandler = new StatusHandler(sendCount, context);
+      this.statusHandler = statusHandler;
     }
 
     protected boolean copy(int inIndex) throws IOException {
@@ -346,9 +349,6 @@ public abstract class PartitionerTemplate implements Partitioner {
       return WritableBatch.getBatchNoHVWrap(recordCount, this, false);
     }
 
-
-
-
     public void clear(){
       vectorContainer.clear();
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
index 98bbeeb..3c2b9e3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataTunnel.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.rpc.data;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.proto.BitData.RpcType;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -66,8 +67,14 @@ public class DataTunnel {
     public String toString() {
       return "SendBatch [batch.header=" + batch.getHeader() + "]";
     }
-    
-    
+
+    @Override
+    public void connectionFailed(FailureType type, Throwable t) {
+      for(ByteBuf buffer : batch.getBuffers()) {
+        buffer.release();
+      }
+      super.connectionFailed(type, t);
+    }
   }
 
   private static class SendBatchAsyncFuture extends FutureBitCommand<Ack, DataClientConnection> {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/e62c3650/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index 70f5dd0..11685c0 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -106,6 +106,9 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       }
 
       root.stop();
+      if(context.isFailed()) {
+        internalFail(context.getFailureCause());
+      }
 
       closed = true;
 
@@ -115,10 +118,15 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       internalFail(ex);
     }finally{
       Thread.currentThread().setName(originalThread);
-      if(!closed) try{
-        context.close();
-      }catch(RuntimeException e){
-        logger.warn("Failure while closing context in failed state.", e);
+      if(!closed) {
+        try {
+          if(context.isFailed()) {
+            internalFail(context.getFailureCause());
+          }
+          context.close();
+        } catch (RuntimeException e) {
+          logger.warn("Failure while closing context in failed state.", e);
+        }
       }
     }
     logger.debug("Fragment runner complete. {}:{}", context.getHandle().getMajorFragmentId(), context.getHandle().getMinorFragmentId());