You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ad...@apache.org on 2015/09/21 21:51:22 UTC

drill git commit: DRILL-3732: Drill leaks memory if external sort hits out of disk space exception

Repository: drill
Updated Branches:
  refs/heads/master 092903dfd -> 3c89b30d4


DRILL-3732: Drill leaks memory if external sort hits out of disk space exception

- ExternalSort.mergeAndSpill() cleans all it's data in case an errors occurs while it's spilling to disk
- made BatchGroup AutoCloseable so it can easily be closed with AutoCloseables.close() if an error occurs
- added injection site while External sort is spilling to disk
- added unit test that forces a 2 batch query to spill to disk and injects an exception while it does so

this closes #147


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/3c89b30d
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/3c89b30d
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/3c89b30d

Branch: refs/heads/master
Commit: 3c89b30d4e5f34f84883b0071a8f6ad747311fa9
Parents: 092903d
Author: adeneche <ad...@gmail.com>
Authored: Fri Sep 4 17:10:17 2015 -0700
Committer: adeneche <ad...@gmail.com>
Committed: Mon Sep 21 12:51:02 2015 -0700

----------------------------------------------------------------------
 .../exec/physical/impl/xsort/BatchGroup.java    |  5 +-
 .../physical/impl/xsort/ExternalSortBatch.java  | 50 ++++++++++---
 .../impl/xsort/TestSortSpillWithException.java  | 79 ++++++++++++++++++++
 .../test/resources/xsort/2batches/0.data.json   | 20 +++++
 .../test/resources/xsort/2batches/1.data.json   | 20 +++++
 5 files changed, 161 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
index 6896faa..aa3acc8 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/BatchGroup.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.fs.Path;
 
 import com.google.common.base.Stopwatch;
 
-public class BatchGroup implements VectorAccessible {
+public class BatchGroup implements VectorAccessible, AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(BatchGroup.class);
 
   private VectorContainer currentContainer;
@@ -142,7 +142,8 @@ public class BatchGroup implements VectorAccessible {
     return currentContainer;
   }
 
-  public void cleanup() throws IOException {
+  @Override
+  public void close() throws IOException {
     currentContainer.zeroVectors();
     if (sv2 != null) {
       sv2.clear();

http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
index f1e22b2..282edb1 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/xsort/ExternalSortBatch.java
@@ -26,6 +26,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
 
+import org.apache.drill.common.AutoCloseables;
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.common.exceptions.UserException;
 import org.apache.drill.common.expression.ErrorCollector;
@@ -127,6 +128,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
   public static final String INTERRUPTION_AFTER_SORT = "after-sort";
   public static final String INTERRUPTION_AFTER_SETUP = "after-setup";
+  public static final String INTERRUPTION_WHILE_SPILLING = "spilling";
 
 
   public ExternalSortBatch(ExternalSort popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException {
@@ -164,11 +166,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return sv4;
   }
 
-  private void cleanupBatchGroups(Collection<BatchGroup> groups) {
+  private void closeBatchGroups(Collection<BatchGroup> groups) {
     for (BatchGroup group: groups) {
       try {
-        group.cleanup();
-      } catch (IOException e) {
+        group.close();
+      } catch (Exception e) {
         // collect all failure and make sure to cleanup all remaining batches
         // Originally we would have thrown a RuntimeException that would propagate to FragmentExecutor.closeOutResources()
         // where it would have been passed to context.fail()
@@ -183,11 +185,11 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
   public void close() {
     try {
       if (batchGroups != null) {
-        cleanupBatchGroups(batchGroups);
+        closeBatchGroups(batchGroups);
         batchGroups = null;
       }
       if (spilledBatchGroups != null) {
-        cleanupBatchGroups(spilledBatchGroups);
+        closeBatchGroups(spilledBatchGroups);
         spilledBatchGroups = null;
       }
     } finally {
@@ -534,7 +536,7 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
 
     String outputFile = Joiner.on("/").join(dirs.next(), fileName, spillCount++);
     BatchGroup newGroup = new BatchGroup(c1, fs, outputFile, oContext.getAllocator());
-
+    boolean threw = true; // true if an exception is thrown in the try block below
     logger.info("Merging and spilling to {}", outputFile);
     try {
       while ((count = copier.next(targetRecordCount)) > 0) {
@@ -543,13 +545,20 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
         // note that addBatch also clears the outputContainer
         newGroup.addBatch(outputContainer);
       }
+      injector.injectChecked(context.getExecutionControls(), INTERRUPTION_WHILE_SPILLING, IOException.class);
       newGroup.closeOutputStream();
-      for (BatchGroup group : batchGroupList) {
-        group.cleanup();
-      }
-      hyperBatch.clear();
+      threw = false; // this should always be the last statement of this try block to make sure we cleanup properly
     } catch (IOException e) {
-      throw new RuntimeException(e);
+      throw UserException.resourceError(e)
+        .message("External Sort encountered an error while spilling to disk")
+        .build(logger);
+    } finally {
+      hyperBatch.clear();
+      cleanAfterMergeAndSpill(batchGroupList, threw);
+      if (threw) {
+        // we only need to cleanup newGroup if spill failed
+        AutoCloseables.close(newGroup, logger);
+      }
     }
     takeOwnership(c1); // transfer ownership from copier allocator to external sort allocator
     long bufSize = getBufferSize(c1);
@@ -559,6 +568,25 @@ public class ExternalSortBatch extends AbstractRecordBatch<ExternalSort> {
     return newGroup;
   }
 
+  /**
+   * Make sure we cleanup properly after merge and spill.<br>If there was any error during the spilling,
+   * we cleanup the resources silently, otherwise we throw any exception we hit during the cleanup
+   *
+   * @param batchGroups spilled batch groups
+   * @param silently true to log any exception that happens during cleanup, false to throw it
+   */
+  private void cleanAfterMergeAndSpill(final List<BatchGroup> batchGroups, boolean silently) {
+      try {
+        AutoCloseables.close(batchGroups.toArray(new BatchGroup[batchGroups.size()]));
+      } catch (Exception e) {
+        if (silently) {
+          logger.warn("Error while cleaning up after merge and spill", e);
+        } else {
+          throw new RuntimeException("Error while cleaning up after merge and spill", e);
+        }
+      }
+  }
+
   private void takeOwnership(VectorAccessible batch) {
     for (VectorWrapper<?> w : batch) {
       DrillBuf[] bufs = w.getValueVector().getBuffers(false);

http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
new file mode 100644
index 0000000..788caf7
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/xsort/TestSortSpillWithException.java
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.xsort;
+
+import org.apache.drill.BaseTestQuery;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.common.util.TestTools;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.proto.UserBitShared.DrillPBError.ErrorType;
+import org.apache.drill.exec.testing.Controls;
+import org.apache.drill.exec.testing.ControlsInjectionUtil;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.Properties;
+
+/**
+ * Testing External Sort's spilling to disk.
+ * <br>
+ * This class changes the following Drill property to force external sort to spill after the 2nd batch:
+ * {@link ExecConstants#EXTERNAL_SORT_SPILL_THRESHOLD} = 1
+ * <br>
+ * {@link ExecConstants#EXTERNAL_SORT_SPILL_GROUP_SIZE} = 1
+ */
+public class TestSortSpillWithException extends BaseTestQuery {
+  private static final String TEST_RES_PATH = TestTools.getWorkingPath() + "/src/test/resources";
+
+  @BeforeClass
+  public static void initCluster() {
+    // make sure memory sorter outputs 20 rows per batch
+    final Properties props = cloneDefaultTestConfigProperties();
+    props.put(ExecConstants.EXTERNAL_SORT_SPILL_THRESHOLD, "1");
+    props.put(ExecConstants.EXTERNAL_SORT_SPILL_GROUP_SIZE, "1");
+
+    updateTestCluster(1, DrillConfig.create(props));
+  }
+
+  @Test
+  public void testSpilLeak() throws Exception {
+    // inject exception in sort while spilling
+    final String controls = Controls.newBuilder()
+      .addExceptionOnBit(
+          ExternalSortBatch.class,
+          ExternalSortBatch.INTERRUPTION_WHILE_SPILLING,
+          IOException.class,
+          bits[0].getContext().getEndpoint())
+      .build();
+    ControlsInjectionUtil.setControls(client, controls);
+    // run a simple order by query
+    try {
+      test("select employee_id from dfs_test.`%s/xsort/2batches` order by employee_id", TEST_RES_PATH);
+      fail("Query should have failed!");
+    } catch (UserRemoteException e) {
+      assertEquals(ErrorType.RESOURCE, e.getErrorType());
+      assertTrue("Incorrect error message",
+        e.getMessage().contains("External Sort encountered an error while spilling to disk"));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/test/resources/xsort/2batches/0.data.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/xsort/2batches/0.data.json b/exec/java-exec/src/test/resources/xsort/2batches/0.data.json
new file mode 100644
index 0000000..b739006
--- /dev/null
+++ b/exec/java-exec/src/test/resources/xsort/2batches/0.data.json
@@ -0,0 +1,20 @@
+{ "employee_id":24 }
+{ "employee_id":36 }
+{ "employee_id":11 }
+{ "employee_id":27 }
+{ "employee_id":23 }
+{ "employee_id":25 }
+{ "employee_id":8 }
+{ "employee_id":12 }
+{ "employee_id":29 }
+{ "employee_id":33 }
+{ "employee_id":19 }
+{ "employee_id":22 }
+{ "employee_id":31 }
+{ "employee_id":30 }
+{ "employee_id":35 }
+{ "employee_id":5 }
+{ "employee_id":4 }
+{ "employee_id":16 }
+{ "employee_id":13 }
+{ "employee_id":9 }

http://git-wip-us.apache.org/repos/asf/drill/blob/3c89b30d/exec/java-exec/src/test/resources/xsort/2batches/1.data.json
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/xsort/2batches/1.data.json b/exec/java-exec/src/test/resources/xsort/2batches/1.data.json
new file mode 100644
index 0000000..ef1d8cc
--- /dev/null
+++ b/exec/java-exec/src/test/resources/xsort/2batches/1.data.json
@@ -0,0 +1,20 @@
+{ "employee_id":20 }
+{ "employee_id":7 }
+{ "employee_id":38 }
+{ "employee_id":3 }
+{ "employee_id":15 }
+{ "employee_id":2 }
+{ "employee_id":39 }
+{ "employee_id":37 }
+{ "employee_id":10 }
+{ "employee_id":18 }
+{ "employee_id":17 }
+{ "employee_id":32 }
+{ "employee_id":34 }
+{ "employee_id":1 }
+{ "employee_id":21 }
+{ "employee_id":14 }
+{ "employee_id":6 }
+{ "employee_id":26 }
+{ "employee_id":0 }
+{ "employee_id":28 }