You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ja...@apache.org on 2015/05/15 08:42:48 UTC
[02/17] drill git commit: DRILL-3061: Fix memory leaks in
TestDrillbitResilience
DRILL-3061: Fix memory leaks in TestDrillbitResilience
- fixes a race condition in WorkEventBus
- marking TestDrillbitResilience with @Ignore
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/bef60f58
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/bef60f58
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/bef60f58
Branch: refs/heads/master
Commit: bef60f58a79b6bb41c06d59897577d7f7017c526
Parents: c5f1c83
Author: adeneche <ad...@gmail.com>
Authored: Tue May 12 10:53:32 2015 -0700
Committer: Jacques Nadeau <ja...@apache.org>
Committed: Thu May 14 21:58:43 2015 -0700
----------------------------------------------------------------------
.../drill/exec/rpc/control/WorkEventBus.java | 16 +-
.../apache/drill/exec/rpc/data/DataServer.java | 2 +-
.../org/apache/drill/exec/ZookeeperHelper.java | 14 +-
.../exec/server/TestDrillbitResilience.java | 190 ++++++++++++++++---
4 files changed, 190 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
index ddd7828..3e461ef 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/control/WorkEventBus.java
@@ -90,16 +90,16 @@ public class WorkEventBus {
}
public FragmentManager getFragmentManager(final FragmentHandle handle) throws FragmentSetupException {
- // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
- if (recentlyFinishedFragments.asMap().containsKey(handle)) {
- if (logger.isDebugEnabled()) {
- logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+ synchronized (this) {
+ // Check if this was a recently finished (completed or cancelled) fragment. If so, throw away message.
+ if (recentlyFinishedFragments.asMap().containsKey(handle)) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Fragment: {} was cancelled. Ignoring fragment handle", handle);
+ }
+ return null;
}
- return null;
- }
- // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
- synchronized (this) {
+ // since non-leaf fragments are sent first, it is an error condition if the manager is unavailable.
final FragmentManager m = managers.get(handle);
if (m != null) {
return m;
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 0d4077e..061ddcb 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -150,7 +150,7 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> {
logger.error("Failure while getting fragment manager. {}",
QueryIdHelper.getQueryIdentifiers(fragmentBatch.getQueryId(),
fragmentBatch.getReceivingMajorFragmentId(),
- fragmentBatch.getReceivingMinorFragmentIdList()));
+ fragmentBatch.getReceivingMinorFragmentIdList()), e);
ack.clear();
sender.send(new Response(RpcType.ACK, Acks.FAIL));
} finally {
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
index a5db81d..630c81b 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/ZookeeperHelper.java
@@ -46,10 +46,22 @@ public class ZookeeperHelper {
* <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
*/
public ZookeeperHelper() {
+ this(false);
+ }
+
+ /**
+ * Constructor.
+ *
+ * <p>Will create a "test-data" directory for Zookeeper's use if one doesn't already exist.
+ * @param failureInCancelled pass true if you want failures in cancelled fragments to be reported as failures
+ */
+ public ZookeeperHelper(boolean failureInCancelled) {
final Properties overrideProps = new Properties();
// Forced to disable this, because currently we leak memory which is a known issue for query cancellations.
// Setting this causes unittests to fail.
- // overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+ if (failureInCancelled) {
+ overrideProps.setProperty(ExecConstants.RETURN_ERROR_FOR_FAILURE_IN_CANCELLED_FRAGMENTS, "true");
+ }
config = DrillConfig.create(overrideProps);
zkUrl = config.getString(ExecConstants.ZK_CONNECTION);
http://git-wip-us.apache.org/repos/asf/drill/blob/bef60f58/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
index 8552ec1..696aed8 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/server/TestDrillbitResilience.java
@@ -30,8 +30,8 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import com.google.common.base.Preconditions;
import org.apache.commons.math3.util.Pair;
+import org.apache.drill.BaseTestQuery;
import org.apache.drill.QueryTestUtil;
import org.apache.drill.SingleRowListener;
import org.apache.drill.common.AutoCloseables;
@@ -48,11 +48,9 @@ import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.TopLevelAllocator;
import org.apache.drill.exec.physical.impl.ScreenCreator;
import org.apache.drill.exec.physical.impl.SingleSenderCreator.SingleSenderRootExec;
-import org.apache.drill.exec.physical.impl.filter.FilterRecordBatch;
import org.apache.drill.exec.physical.impl.mergereceiver.MergingRecordBatch;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec;
import org.apache.drill.exec.physical.impl.partitionsender.PartitionerDecorator;
-import org.apache.drill.exec.physical.impl.union.UnionAllRecordBatch;
import org.apache.drill.exec.physical.impl.unorderedreceiver.UnorderedReceiverBatch;
import org.apache.drill.exec.physical.impl.xsort.ExternalSortBatch;
import org.apache.drill.exec.planner.sql.DrillSqlWorker;
@@ -88,11 +86,14 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
+import com.google.common.base.Preconditions;
+
/**
* Test how resilient drillbits are to throwing exceptions during various phases of query
* execution by injecting exceptions at various points and to cancellations in various phases.
* The test cases are mentioned in DRILL-2383.
*/
+@Ignore
public class TestDrillbitResilience extends DrillTest {
private static final Logger logger = org.slf4j.LoggerFactory.getLogger(TestDrillbitResilience.class);
@@ -164,7 +165,7 @@ public class TestDrillbitResilience extends DrillTest {
// turn off the HTTP server to avoid port conflicts between the drill bits
System.setProperty(ExecConstants.HTTP_ENABLE, "false");
- zkHelper = new ZookeeperHelper();
+ zkHelper = new ZookeeperHelper(true);
zkHelper.startZookeeper(1);
// use a non-null service set so that the drillbits can use port hunting
@@ -269,7 +270,6 @@ public class TestDrillbitResilience extends DrillTest {
assertTrue("There should not be any errors when checking if Drillbits are OK.", errorList.isEmpty());
}
- @SuppressWarnings("static-method")
@After
public void checkDrillbits() {
clearAllInjections(); // so that the drillbit check itself doesn't trigger anything
@@ -355,6 +355,8 @@ public class TestDrillbitResilience extends DrillTest {
@Test
public void settingNoopInjectionsAndQuery() {
+ final long before = countAllocatedMemory();
+
final String controls = createSingleExceptionOnBit(getClass(), "noop", RuntimeException.class, DRILLBIT_BETA);
setControls(controls);
try {
@@ -362,6 +364,9 @@ public class TestDrillbitResilience extends DrillTest {
} catch (final Exception e) {
fail(e.getMessage());
}
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -381,16 +386,24 @@ public class TestDrillbitResilience extends DrillTest {
}
}
- @SuppressWarnings("static-method")
@Test
public void foreman_runTryBeginning() {
+ final long before = countAllocatedMemory();
+
testForeman("run-try-beginning");
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
- @SuppressWarnings("static-method")
@Test
public void foreman_runTryEnd() {
+ final long before = countAllocatedMemory();
+
testForeman("run-try-end");
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -463,7 +476,7 @@ public class TestDrillbitResilience extends DrillTest {
}
result.release();
}
- };
+ }
/**
* Thread that cancels the given query id. After the cancel is acknowledged, the latch is counted down.
@@ -537,7 +550,11 @@ public class TestDrillbitResilience extends DrillTest {
* Given a set of controls, this method ensures that the TEST_QUERY completes with a CANCELED state.
*/
private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener) {
- assertCancelled(controls, TEST_QUERY, listener);
+ assertCancelledWithoutException(controls, listener, TEST_QUERY);
+ }
+
+ private static void assertCancelledWithoutException(final String controls, final WaitUntilCompleteListener listener, final String query) {
+ assertCancelled(controls, query, listener);
}
/**
@@ -579,6 +596,9 @@ public class TestDrillbitResilience extends DrillTest {
@Test // To test pause and resume. Test hangs if resume did not happen.
public void passThrough() {
+ final long before = countAllocatedMemory();
+
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@Override
public void queryIdArrived(final QueryId queryId) {
@@ -595,11 +615,16 @@ public class TestDrillbitResilience extends DrillTest {
QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertCompleteState(result, QueryState.COMPLETED);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 1: cancel before any result set is returned
@Ignore // DRILL-3052
public void cancelBeforeAnyResultsArrive() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
@Override
@@ -611,10 +636,15 @@ public class TestDrillbitResilience extends DrillTest {
final String controls = createPauseInjection(Foreman.class, "foreman-ready");
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 2: cancel in the middle of fetching result set
public void cancelInMiddleOfFetchingResults() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private boolean cancelRequested = false;
@@ -632,11 +662,16 @@ public class TestDrillbitResilience extends DrillTest {
// skip once i.e. wait for one batch, so that #dataArrived above triggers #cancelAndResume
final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 3: cancel after all result set are produced but not all are fetched
public void cancelAfterAllResultsProduced() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private int count = 0;
@@ -652,10 +687,16 @@ public class TestDrillbitResilience extends DrillTest {
final String controls = createPauseInjection(ScreenCreator.class, "send-complete");
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Cancellation TC 4: cancel after everything is completed and fetched
+ @Ignore
public void cancelAfterEverythingIsCompleted() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
private int count = 0;
@@ -671,16 +712,25 @@ public class TestDrillbitResilience extends DrillTest {
final String controls = createPauseInjection(Foreman.class, "foreman-cleanup");
assertCancelledWithoutException(controls, listener);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Completion TC 1: success
public void successfullyCompletes() {
+ final long before = countAllocatedMemory();
+
final WaitUntilCompleteListener listener = new WaitUntilCompleteListener();
QueryTestUtil.testWithListener(drillClient, QueryType.SQL, TEST_QUERY, listener);
final Pair<QueryState, Exception> result = listener.waitForCompletion();
assertCompleteState(result, QueryState.COMPLETED);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
+
/**
* Given a set of controls, this method ensures TEST_QUERY fails with the given class and desc.
*/
@@ -702,26 +752,41 @@ public class TestDrillbitResilience extends DrillTest {
@Test // Completion TC 2: failed query - before query is executed - while sql parsing
public void failsWhenParsing() {
+ final long before = countAllocatedMemory();
+
final String exceptionDesc = "sql-parsing";
final Class<? extends Throwable> exceptionClass = ForemanSetupException.class;
final String controls = createSingleException(DrillSqlWorker.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Completion TC 3: failed query - before query is executed - while sending fragments to other drillbits
public void failsWhenSendingFragments() {
+ final long before = countAllocatedMemory();
+
final String exceptionDesc = "send-fragments";
final Class<? extends Throwable> exceptionClass = ForemanException.class;
final String controls = createSingleException(Foreman.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
@Test // Completion TC 4: failed query - during query execution
public void failsDuringExecution() {
+ final long before = countAllocatedMemory();
+
final String exceptionDesc = "fragment-execution";
final Class<? extends Throwable> exceptionClass = IOException.class;
final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
assertFailsWithException(controls, exceptionClass, exceptionDesc);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -730,8 +795,13 @@ public class TestDrillbitResilience extends DrillTest {
*/
@Test
public void testInterruptingBlockedMergingRecordBatch() {
+ final long before = countAllocatedMemory();
+
final String control = createPauseInjection(MergingRecordBatch.class, "waiting-for-data", 1);
testInterruptingBlockedFragmentsWaitingForData(control);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
/**
@@ -740,8 +810,13 @@ public class TestDrillbitResilience extends DrillTest {
*/
@Test
public void testInterruptingBlockedUnorderedReceiverBatch() {
+ final long before = countAllocatedMemory();
+
final String control = createPauseInjection(UnorderedReceiverBatch.class, "waiting-for-data", 1);
testInterruptingBlockedFragmentsWaitingForData(control);
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
}
private static void testInterruptingBlockedFragmentsWaitingForData(final String control) {
@@ -769,22 +844,27 @@ public class TestDrillbitResilience extends DrillTest {
setSessionOption(HASHAGG.getOptionName(), "true");
setSessionOption(PARTITION_SENDER_SET_THREADS.getOptionName(), "6");
+ final long before = countAllocatedMemory();
+
final String controls = "{\"injections\" : ["
- + "{"
- + "\"type\" : \"latch\","
- + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
- + "\"desc\" : \"partitioner-sender-latch\""
- + "},"
- + "{"
- + "\"type\" : \"pause\","
- + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
- + "\"desc\" : \"wait-for-fragment-interrupt\","
- + "\"nSkip\" : 1"
- + "}" +
- "]}";
+ + "{"
+ + "\"type\" : \"latch\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"partitioner-sender-latch\""
+ + "},"
+ + "{"
+ + "\"type\" : \"pause\","
+ + "\"siteClass\" : \"" + PartitionerDecorator.class.getName() + "\","
+ + "\"desc\" : \"wait-for-fragment-interrupt\","
+ + "\"nSkip\" : 1"
+ + "}" +
+ "]}";
final String query = "SELECT sales_city, COUNT(*) cnt FROM cp.`region.json` GROUP BY sales_city";
assertCancelled(controls, query, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
} finally {
setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
setSessionOption(HASHAGG.getOptionName(), HASHAGG.getDefault().bool_val.toString());
@@ -795,9 +875,75 @@ public class TestDrillbitResilience extends DrillTest {
@Test
public void testInterruptingWhileFragmentIsBlockedInAcquiringSendingTicket() throws Exception {
+
+ final long before = countAllocatedMemory();
+
final String control =
- createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
+ createPauseInjection(SingleSenderRootExec.class, "data-tunnel-send-batch-wait-for-interrupt", 1);
assertCancelled(control, TEST_QUERY, new ListenerThatCancelsQueryAfterFirstBatchOfData());
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+ }
+
+ @Test
+ public void memoryLeaksWhenCancelled() {
+ setSessionOption(SLICE_TARGET, "10");
+
+ final long before = countAllocatedMemory();
+
+ final String controls = createPauseInjection(ScreenCreator.class, "sending-data", 1);
+ String query = null;
+ try {
+ query = BaseTestQuery.getFile("queries/tpch/09.sql");
+ } catch (final IOException e) {
+ fail("Failed to get query file: " + e);
+ }
+
+ final WaitUntilCompleteListener listener = new WaitUntilCompleteListener() {
+ private boolean cancelRequested = false;
+
+ @Override
+ public void dataArrived(final QueryDataBatch result, final ConnectionThrottle throttle) {
+ if (!cancelRequested) {
+ check(queryId != null, "Query id should not be null, since we have waited long enough.");
+ cancelAndResume();
+ cancelRequested = true;
+ }
+ result.release();
+ }
+ };
+
+ assertCancelledWithoutException(controls, listener, query.substring(0, query.length() - 1));
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
+ }
+
+ @Test
+ public void memoryLeaksWhenFailed() {
+ setSessionOption(SLICE_TARGET, "10");
+
+ final long before = countAllocatedMemory();
+
+ final String exceptionDesc = "fragment-execution";
+ final Class<? extends Throwable> exceptionClass = IOException.class;
+ final String controls = createSingleException(FragmentExecutor.class, exceptionDesc, exceptionClass);
+ String query = null;
+ try {
+ query = BaseTestQuery.getFile("queries/tpch/09.sql");
+ } catch (final IOException e) {
+ fail("Failed to get query file: " + e);
+ }
+
+ assertFailsWithException(controls, exceptionClass, exceptionDesc, query.substring(0, query.length() - 1));
+
+ final long after = countAllocatedMemory();
+ assertEquals(String.format("We are leaking %d bytes", after - before), before, after);
+
+ setSessionOption(SLICE_TARGET, Long.toString(SLICE_TARGET_DEFAULT));
}
@Test // DRILL-3065