You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2015/09/21 21:51:22 UTC
drill git commit: DRILL-3732: Drill leaks memory if external sort
hits out of disk space exception
Repository: drill
Updated Branches:
refs/heads/master 092903dfd -> 3c89b30d4
DRILL-3732: Drill leaks memory if external sort hits out of disk space exception
- ExternalSort.mergeAndSpill() cleans all it's data in case an errors occurs while it's spilling to disk
- made BatchGroup AutoCloseable so it can easily be closed with AutoCloseables.close() if an error occurs
- added injection site while External sort is spilling to disk
- added unit test that forces a 2 batch query to spill to disk and injects an exception while it does so
this closes #147
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3c89b30d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3c89b30d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3c89b30d
Branch: refs/heads/master
Commit: 3c89b30d4e5f34f84883b0071a8f6ad747311fa9
Parents: 092903d
Author: adeneche <ad...@gmail.com>
Authored: Fri Sep 4 17:10:17 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Mon Sep 21 12:51:02 2015 -0700
----------------------------------------------------------------------
.../exec/physical/impl/xsort/BatchGroup.java | 5 +-
.../physical/impl/xsort/ExternalSortBatch.java | 50 ++++++++++---
.../impl/xsort/TestSortSpillWithException.java | 79 ++++++++++++++++++++
.../test/resources/xsort/2batches/0.data.json | 20 +++++
.../test/resources/xsort/2batches/1.data.json | 20 +++++
5 files changed, 161 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 6896faa..aa3acc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.fs.Path;
import com.google.common.base.Stopwatch;
-public class BatchGroup implements VectorAccessible {
+public class BatchGroup implements VectorAccessible, AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
private VectorContainer currentContainer;
@@ -142,7 +142,8 @@ public class BatchGroup implements VectorAccessible {
return currentContainer;
}
- public void cleanup() throws IOException {
+ @Override
+ public void close() throws IOException {
currentContainer.zeroVectors();
if (sv2 != null) {
sv2.clear();
http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index f1e22b2..282edb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.drill.common.AutoCloseables;
import org.apache.drill.common.config.DrillConfig;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.ErrorCollector;
@@ -127,6 +128,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public static final String INTERRUPTION_AFTER_SORT = "after-sort";
public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
+ public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
@@ -164,11 +166,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return sv4;
}
- private void cleanupBatchGroups(Collection<BatchGroup> groups) {
+ private void closeBatchGroups(Collection<BatchGroup> groups) {
for (BatchGroup group: groups) {
try {
- group.cleanup();
- } catch (IOException e) {
+ group.close();
+ } catch (Exception e) {
// collect all failure and make sure to cleanup all remaining batches
// Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources()
// where it would have been passed to context.fail()
@@ -183,11 +185,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
public void close() {
try {
if (batchGroups != null) {
- cleanupBatchGroups(batchGroups);
+ closeBatchGroups(batchGroups);
batchGroups = null;
}
if (spilledBatchGroups != null) {
- cleanupBatchGroups(spilledBatchGroups);
+ closeBatchGroups(spilledBatchGroups);
spilledBatchGroups = null;
}
} finally {
@@ -534,7 +536,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
-
+ boolean threw = true; // true if an exception is thrown in the try block below
logger.info("Merging and spilling to {}", outputFile);
try {
while ((count = copier.next(targetRecordCount)) > 0) {
@@ -543,13 +545,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
// note that addBatch also clears the outputContainer
newGroup.addBatch(outputContainer);
}
+ injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class);
newGroup.closeOutputStream();
- for (BatchGroup group : batchGroupList) {
- group.cleanup();
- }
- hyperBatch.clear();
+ threw = false; // this should always be the last statement of this try block to make sure we cleanup properly
} catch (IOException e) {
- throw new RuntimeException(e);
+ throw UserException.resourceError(e)
+ .message("External Sort encountered an error while spilling to disk")
+ .build(logger);
+ } finally {
+ hyperBatch.clear();
+ cleanAfterMergeAndSpill(batchGroupList, threw);
+ if (threw) {
+ // we only need to cleanup newGroup if spill failed
+ AutoCloseables.close(newGroup, logger);
+ }
}
takeOwnership(c1); // transfer ownership from copier allocator to external sort allocator
long bufSize = getBufferSize(c1);
@@ -559,6 +568,25 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
return newGroup;
}
+ /**
+ * Make sure we cleanup properly after merge and spill.<br>If there was any error during the spilling,
+ * we cleanup the resources silently, otherwise we throw any exception we hit during the cleanup
+ *
+ * @param batchGroups spilled batch groups
+ * @param silently true to log any exception that happens during cleanup, false to throw it
+ */
+ private void cleanAfterMergeAndSpill(final List<BatchGroup> batchGroups, boolean silently) {
+ try {
+ AutoCloseables.close(batchGroups.toArray(new BatchGroup[batchGroups.size()]));
+ } catch (Exception e) {
+ if (silently) {
+ logger.warn("Error while cleaning up after merge and spill", e);
+ } else {
+ throw new RuntimeException("Error while cleaning up after merge and spill", e);
+ }
+ }
+ }
+
private void takeOwnership(VectorAccessible batch) {
for (VectorWrapper<?> w : batch) {
DrillBuf[] bufs = w.getValueVector().getBuffers(false);
http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
new file mode 100644
index 0000000..788caf7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.testing.Controls;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Testing External Sort's spilling to disk.
+ * <br>
+ * This class changes the following Drill property to force external sort to spill after the 2nd batch:
+ * {@link ExecConstants#EXTERNAL_SORT_SPILL_THRESHOLD} = 1
+ * <br>
+ * {@link ExecConstants#EXTERNAL_SORT_SPILL_GROUP_SIZE} = 1
+ */
+public class TestSortSpillWithException extends BaseTestQuery {
+ private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources";
+
+ @BeforeClass
+ public static void initCluster() {
+ // make sure memory sorter outputs 20 rows per batch
+ final Properties props = cloneDefaultTestConfigProperties();
+ props.put(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, "1");
+ props.put(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, "1");
+
+ updateTestCluster(1, DrillConfig.create(props));
+ }
+
+ @Test
+ public void testSpilLeak() throws Exception {
+ // inject exception in sort while spilling
+ final String controls = Controls.newBuilder()
+ .addExceptionOnBit(
+ ExternalSortBatch.class,
+ ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+ IOException.class,
+ bits[0].getContext().getEndpoint())
+ .build();
+ ControlsInjectionUtil.setControls(client, controls);
+ // run a simple order by query
+ try {
+ test("select employee_id from dfs_test.`%s/xsort/2batches` order by employee_id", TEST_RES_PATH);
+ fail("Query should have failed!");
+ } catch (UserRemoteException e) {
+ assertEquals(ErrorType.RESOURCE, e.getErrorType());
+ assertTrue("Incorrect error message",
+ e.getMessage().contains("External Sort encountered an error while spilling to disk"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/test/resources/xsort/2batches/0.data.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/xsort/2batches/0.data.json b/exec/java-exec/src/test/resources/xsort/2batches/0.data.json
new file mode 100644
index 0000000..b739006
--- /dev/null
+++ b/exec/java-exec/src/test/resources/xsort/2batches/0.data.json
@@ -0,0 +1,20 @@
+{ "employee_id":24 }
+{ "employee_id":36 }
+{ "employee_id":11 }
+{ "employee_id":27 }
+{ "employee_id":23 }
+{ "employee_id":25 }
+{ "employee_id":8 }
+{ "employee_id":12 }
+{ "employee_id":29 }
+{ "employee_id":33 }
+{ "employee_id":19 }
+{ "employee_id":22 }
+{ "employee_id":31 }
+{ "employee_id":30 }
+{ "employee_id":35 }
+{ "employee_id":5 }
+{ "employee_id":4 }
+{ "employee_id":16 }
+{ "employee_id":13 }
+{ "employee_id":9 }
http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/test/resources/xsort/2batches/1.data.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/xsort/2batches/1.data.json b/exec/java-exec/src/test/resources/xsort/2batches/1.data.json
new file mode 100644
index 0000000..ef1d8cc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/xsort/2batches/1.data.json
@@ -0,0 +1,20 @@
+{ "employee_id":20 }
+{ "employee_id":7 }
+{ "employee_id":38 }
+{ "employee_id":3 }
+{ "employee_id":15 }
+{ "employee_id":2 }
+{ "employee_id":39 }
+{ "employee_id":37 }
+{ "employee_id":10 }
+{ "employee_id":18 }
+{ "employee_id":17 }
+{ "employee_id":32 }
+{ "employee_id":34 }
+{ "employee_id":1 }
+{ "employee_id":21 }
+{ "employee_id":14 }
+{ "employee_id":6 }
+{ "employee_id":26 }
+{ "employee_id":0 }
+{ "employee_id":28 }