You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/15 04:52:07 UTC
drill git commit: DRILL-3065: Ensure the selection vectors in mSort
to be closed after failure happens
Repository: drill
Updated Branches:
refs/heads/master 7f575df33 -> 5c4a9b212
DRILL-3065: Ensure the selection vectors in mSort to be closed after failure happens
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/5c4a9b21
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/5c4a9b21
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/5c4a9b21
Branch: refs/heads/master
Commit: 5c4a9b2129dc7c2ae4a4fca9640ddbf990da126b
Parents: 7f575df
Author: Hsuan-Yi Chu <hs...@usc.edu>
Authored: Wed May 13 17:17:53 2015 -0700
Committer: Hsuan-Yi Chu <hs...@usc.edu>
Committed: Thu May 14 19:28:26 2015 -0700
----------------------------------------------------------------------
.../physical/impl/xsort/ExternalSortBatch.java | 15 ++++++
.../exec/physical/impl/xsort/MSortTemplate.java | 11 ++++
.../drill/exec/physical/impl/xsort/MSorter.java | 1 +
.../exec/server/TestDrillbitResilience.java | 54 +++++++++++++++++++-
4 files changed, 79 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index 529a6ca..de4a86e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -63,6 +63,7 @@ import org.apache.drill.exec.record.WritableBatch;
import org.apache.drill.exec.record.selection.SelectionVector2;
import org.apache.drill.exec.record.selection.SelectionVector4;
import org.apache.drill.exec.store.ischema.Records;
+import org.apache.drill.exec.testing.ExecutionControlsInjector;
import org.apache.drill.exec.vector.CopyUtil;
import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.AbstractContainerVector;
@@ -112,6 +113,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
private final String fileName;
private int firstSpillBatchCount = 0;
+ public static final String INTERRUPTION_AFTER_SORT = "after-sort";
+ public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
+ private final static ExecutionControlsInjector injector = ExecutionControlsInjector.getInjector(ExternalSortBatch.class);
+
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
super(popConfig, context, true);
this.incoming = incoming;
@@ -172,6 +177,10 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
}
copierAllocator.close();
super.close();
+
+ if(mSorter != null) {
+ mSorter.clear();
+ }
}
}
@@ -366,12 +375,18 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
sv4 = builder.getSv4();
mSorter = createNewMSorter();
mSorter.setup(context, oContext.getAllocator(), getSelectionVector4(), this.container);
+
+ // For testing memory-leak purpose, inject exception after mSorter finishes setup
+ injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SETUP);
mSorter.sort(this.container);
// sort may have prematurely exited due to should continue returning false.
if (!context.shouldContinue()) {
return IterOutcome.STOP;
}
+
+ // For testing memory-leak purpose, inject exception after mSorter finishes sorting
+ injector.injectUnchecked(context.getExecutionControls(), INTERRUPTION_AFTER_SORT);
sv4 = mSorter.getSV4();
long t = watch.elapsed(TimeUnit.MICROSECONDS);
http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
index 9acae9e..9b21ae3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSortTemplate.java
@@ -171,6 +171,17 @@ public abstract class MSortTemplate implements MSorter, IndexedSortable{
return doEval(sv1, sv2);
}
+ @Override
+ public void clear() {
+ if(vector4 != null) {
+ vector4.clear();
+ }
+
+ if(aux != null) {
+ aux.clear();
+ }
+ }
+
public abstract void doSetup(@Named("context") FragmentContext context, @Named("incoming") VectorContainer incoming, @Named("outgoing") RecordBatch outgoing);
public abstract int doEval(@Named("leftIndex") int leftIndex, @Named("rightIndex") int rightIndex);
http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
index d97ffc0..af8cbfb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/MSorter.java
@@ -33,4 +33,5 @@ public interface MSorter {
public static TemplateClassDefinition<MSorter> TEMPLATE_DEFINITION = new TemplateClassDefinition<MSorter>(MSorter.class, MSortTemplate.class);
+ public void clear();
}
http://git-wip-us.apache.org/repos/asf/drill/blob/5c4a9b21/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index f95fbe1..8552ec1 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -48,10 +48,13 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
+import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
+import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
+import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.proto.GeneralRPCProtos.Ack;
@@ -682,16 +685,21 @@ public class TestDrillbitResilience extends DrillTest {
* Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
*/
private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
- final String exceptionDesc) {
+ final String exceptionDesc, final String query) {
setControls(controls);
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
- QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
+ QueryTestUtil.testWithListener(drillClient, QueryType.SQL, query, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
final QueryState state = result.getFirst();
assertTrue(String.format("Query state should be FAILED (and not %s).", state), state == QueryState.FAILED);
assertExceptionInjected(result.getSecond(), exceptionClass, exceptionDesc);
}
+ private static void assertFailsWithException(final String controls, final Class<? extends Throwable> exceptionClass,
+ final String exceptionDesc) {
+ assertFailsWithException(controls, exceptionClass, exceptionDesc, TEST_QUERY);
+ }
+
@Test // Completion TC 2: failed query - before query is executed - while sql parsing
public void failsWhenParsing() {
final String exceptionDesc = "sql-parsing";
@@ -791,4 +799,46 @@ public class TestDrillbitResilience extends DrillTest {
createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
}
+
+ @Test // DRILL-3065
+ public void testInterruptingAfterMSorterSorting() {
+ final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
+ Class<? extends Exception> typeOfException = RuntimeException.class;
+
+ final long before = countAllocatedMemory();
+ final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SORT, typeOfException);
+ assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SORT, query);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ }
+
+ @Test // DRILL-3085
+ public void testInterruptingAfterMSorterSetup() {
+ final String query = "select n_name from cp.`tpch/nation.parquet` order by n_name";
+ Class<? extends Exception> typeOfException = RuntimeException.class;
+
+ final long before = countAllocatedMemory();
+ final String controls = createSingleException(ExternalSortBatch.class, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, typeOfException);
+ assertFailsWithException(controls, typeOfException, ExternalSortBatch.INTERRUPTION_AFTER_SETUP, query);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ }
+
+ private long countAllocatedMemory() {
+ // wait to make sure all fragments finished cleaning up
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException e) {
+ // just ignore
+ }
+
+ long allocated = 0;
+ for (String name : drillbits.keySet()) {
+ allocated += drillbits.get(name).getContext().getAllocator().getAllocatedMemory();
+ }
+
+ return allocated;
+ }
}