You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/15 04:52:07 UTC

drill git commit: DRILL-3065: Ensure the selection vectors in mSort to be closed after failure happens

Repository: drill
Updated Branches:
  refs/heads/master 7f575df33 -> 5c4a9b212


DRILL-3065: Ensure the selection vectors in mSort to be closed after failure happens


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5c4a9b21
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5c4a9b21
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5c4a9b21

Branch: refs/heads/master
Commit: 5c4a9b2129dc7c2ae4a4fca9640ddbf990da126b
Parents: 7f575df
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Wed May 13 17:17:53 2015 -0700
Committer: Hsuan-Yi Chu <hs...@usc.edu>
Committed: Thu May 14 19:28:26 2015 -0700

----------------------------------------------------------------------
 .../physical/impl/xsort/ExternalSortBatch.java  | 15 ++++++
 .../exec/physical/impl/xsort/MSortTemplate.java | 11 ++++
 .../drill/exec/physical/impl/xsort/MSorter.java |  1 +
 .../exec/server/TestDrillbitResilience.java     | 54 +++++++++++++++++++-
 4 files changed, 79 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 529a6ca..de4a86e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -63,6 +63,7 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.ischema.Records;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
 import org.apache.drill.exec.vector.CopyUtil;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -112,6 +113,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   private final String fileName;
   private int firstSpillBatchCount = 0;
 
+  public static final String INTERRUPTION_AFTER_SORT = "after-sort";
+  public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
+  private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ExternalSortBatch.class);
+
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
     super(popConfig, context, true);
     this.incoming = incoming;
@@ -172,6 +177,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
       }
       copierAllocator.close();
       super.close();
+
+      if(mSorter != null) {
+        mSorter.clear();
+      }
     }
   }
 
@@ -366,12 +375,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         sv4 = builder.getSv4();
         mSorter = createNewMSorter();
         mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
+
+        // For testing memory-leak purpose, inject exception after mSorter finishes setup
+        injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SETUP);
         mSorter.sort(this.container);
 
         // sort may have prematurely exited due to should continue returning false.
         if (!context.shouldContinue()) {
           return IterOutcome.STOP;
         }
+
+        // For testing memory-leak purpose, inject exception after mSorter finishes sorting
+        injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SORT);
         sv4 = mSorter.getSV4();
 
         long t = watch.elapsed(TimeUnit.MICROSECONDS);

http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9acae9e..9b21ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -171,6 +171,17 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
     return doEval(sv1, sv2);
   }
 
+  @Override
+  public void clear() {
+    if(vector4 != null) {
+      vector4.clear();
+    }
+
+    if(aux != null) {
+      aux.clear();
+    }
+  }
+
   public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
   public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
index d97ffc0..af8cbfb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
@@ -33,4 +33,5 @@ public interface MSorter {
 
   public static TemplateClassDefinition<MSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<MSorter>(MSorter.class, MSortTemplate.class);
 
+  public void clear();
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index f95fbe1..8552ec1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -48,10 +48,13 @@ import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.TopLevelAllocator;
 import org.apache.drill.exec.physical.impl.ScreenCreator;
 import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
+import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
 import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
 import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
+import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
 import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
+import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
 import org.apache.drill.exec.planner.sql.DrillSqlWorker;
 import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
 import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -682,16 +685,21 @@ public class TestDrillbitResilience extends DrillTest {
    * Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
    */
   private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
-                                               final String exceptionDesc) {
+                                               final String exceptionDesc, final String query) {
     setControls(controls);
     final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
-    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+    QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener);
     final Pair<QueryState, Exception> result = listener.waitForCompletion();
     final QueryState state = result.getFirst();
     assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
     assertExceptionInjected(result.getSecond(), exceptionClass, exceptionDesc);
   }
 
+  private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+      final String exceptionDesc) {
+    assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
+  }
+
   @Test // Completion TC 2: failed query - before query is executed - while sql parsing
   public void failsWhenParsing() {
     final String exceptionDesc = "sql-parsing";
@@ -791,4 +799,46 @@ public class TestDrillbitResilience extends DrillTest {
         createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
     assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
   }
+
+  @Test // DRILL-3065
+  public void testInterruptingAfterMSorterSorting() {
+    final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
+    Class<? extends Exception> typeOfException = RuntimeException.class;
+
+    final long before = countAllocatedMemory();
+    final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SORT, typeOfException);
+    assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SORT, query);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+  }
+
+  @Test // DRILL-3085
+  public void testInterruptingAfterMSorterSetup() {
+    final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
+    Class<? extends Exception> typeOfException = RuntimeException.class;
+
+    final long before = countAllocatedMemory();
+    final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, typeOfException);
+    assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, query);
+
+    final long after = countAllocatedMemory();
+    assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+  }
+
+  private long countAllocatedMemory() {
+    // wait to make sure all fragments finished cleaning up
+    try {
+      Thread.sleep(2000);
+    } catch (InterruptedException e) {
+      // just ignore
+    }
+
+    long allocated = 0;
+    for (String name : drillbits.keySet()) {
+      allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory();
+    }
+
+    return allocated;
+  }
 }