You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/13 04:42:52 UTC

[3/6] drill git commit: DRILL-2780: Check for open files when closing OperatorContext

DRILL-2780: Check for open files when closing OperatorContext


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ed200e25
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ed200e25
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ed200e25

Branch: refs/heads/master
Commit: ed200e257249e67176a259bab0d607841bf37fa4
Parents: 6d7cda8
Author: Steven Phillips <sm...@apache.org>
Authored: Sun May 10 02:33:14 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700

----------------------------------------------------------------------
 .../apache/drill/exec/ops/OperatorContext.java  |   5 +
 .../drill/exec/ops/OperatorContextImpl.java     |  22 ++++
 .../drill/exec/store/dfs/DrillFileSystem.java   |  42 +++----
 .../exec/store/dfs/easy/EasyFormatPlugin.java   |   2 +-
 .../store/parquet/ParquetScanBatchCreator.java  |   2 +-
 .../exec/work/fragment/FragmentExecutor.java    |   2 +-
 .../drill/exec/testing/TestResourceLeak.java    | 126 +++++++++++++++++++
 .../resources/memory/tpch01_memory_leak.sql     |  22 ++++
 8 files changed, 195 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 35139d5..7eb7d8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.ops;
 
 import io.netty.buffer.DrillBuf;
 
+import java.io.IOException;
 import java.util.Iterator;
 
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
 
 public abstract class OperatorContext {
 
@@ -39,6 +42,8 @@ public abstract class OperatorContext {
 
   public abstract ExecutionControls getExecutionControls();
 
+  public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
+
   public static int getChildCount(PhysicalOperator popConfig) {
     Iterator<PhysicalOperator> iter = popConfig.iterator();
     int i = 0;

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 9fa8867..ce9f351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -17,14 +17,20 @@
  */
 package org.apache.drill.exec.ops;
 
+import com.google.common.base.Preconditions;
 import io.netty.buffer.DrillBuf;
 
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.physical.base.PhysicalOperator;
 
 import com.carrotsearch.hppc.LongObjectOpenHashMap;
 import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
 
 class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
@@ -36,6 +42,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
   private OperatorStats stats;
   private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
   private final boolean applyFragmentLimit;
+  private DrillFileSystem fs;
 
   public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
     this.applyFragmentLimit=applyFragmentLimit;
@@ -108,6 +115,14 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     if (allocator != null) {
       allocator.close();
     }
+
+    if (fs != null) {
+      try {
+        fs.close();
+      } catch (IOException e) {
+        throw new DrillRuntimeException(e);
+      }
+    }
     closed = true;
   }
 
@@ -115,4 +130,11 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
     return stats;
   }
 
+  @Override
+  public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
+    Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+    fs = new DrillFileSystem(conf, getStats());
+    return fs;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index b6a9c30..25dd811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -71,30 +71,7 @@ import com.google.common.collect.Maps;
 public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
   private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
-  private final static ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
-
-  static {
-    if (TRACKING_ENABLED) {
-      Runtime.getRuntime().addShutdownHook(new Thread() {
-        public void run() {
-          if (openedFiles.size() != 0) {
-            final StringBuffer errMsgBuilder = new StringBuffer();
-
-            errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
-                " still [%d] files open.\n", openedFiles.size()));
-
-            for(DebugStackTrace stackTrace : openedFiles.values()) {
-              stackTrace.addToStringBuilder(errMsgBuilder);
-            }
-
-            final String errMsg = errMsgBuilder.toString();
-            logger.error(errMsg);
-            throw new IllegalStateException(errMsg);
-          }
-        }
-      });
-    }
-  }
+  private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
 
   private final FileSystem underlyingFs;
   private final OperatorStats operatorStats;
@@ -420,7 +397,22 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
 
   @Override
   public void close() throws IOException {
-    underlyingFs.close();
+    if (TRACKING_ENABLED) {
+      if (openedFiles.size() != 0) {
+        final StringBuffer errMsgBuilder = new StringBuffer();
+
+        errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
+            " still [%d] files open.\n", openedFiles.size()));
+
+        for (DebugStackTrace stackTrace : openedFiles.values()) {
+          stackTrace.addToStringBuilder(errMsgBuilder);
+        }
+
+        final String errMsg = errMsgBuilder.toString();
+        logger.error(errMsg);
+        throw new IllegalStateException(errMsg);
+      }
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d1b46e1..233c32b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
                                                                        */);
     final DrillFileSystem dfs;
     try {
-      dfs = new DrillFileSystem(fsConf, oContext.getStats());
+      dfs = oContext.newFileSystem(fsConf);
     } catch (IOException e) {
       throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 5e9c4ca..4423fde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -100,7 +100,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
 
     DrillFileSystem fs;
     try {
-      fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+      fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
     } catch(IOException e) {
       throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
     }

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index d96e6d6..2712fe7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -228,7 +228,6 @@ public class FragmentExecutor implements Runnable {
         }
       });
 
-      updateState(FragmentState.FINISHED);
     } catch (OutOfMemoryError | OutOfMemoryRuntimeException e) {
       if (!(e instanceof OutOfMemoryError) || "Direct buffer memory".equals(e.getMessage())) {
         fail(UserException.memoryError(e).build());
@@ -249,6 +248,7 @@ public class FragmentExecutor implements Runnable {
 
       closeOutResources();
 
+      updateState(FragmentState.FINISHED);
       // send the final state of the fragment. only the main execution thread can send the final state and it can
       // only be sent once.
       sendFinalState();

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
new file mode 100644
index 0000000..d7e317c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.test.DrillTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
+
+public class TestResourceLeak extends DrillTest {
+
+  private static DrillClient client;
+  private static Drillbit bit;
+  private static RemoteServiceSet serviceSet;
+  private static DrillConfig config;
+  private static BufferAllocator allocator;
+
+  @SuppressWarnings("serial")
+  private static final Properties TEST_CONFIGURATIONS = new Properties() {
+    {
+      put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
+      put(ExecConstants.HTTP_ENABLE, "false");
+    }
+  };
+
+  @BeforeClass
+  public static void openClient() throws Exception {
+    config = DrillConfig.create(TEST_CONFIGURATIONS);
+    allocator = new TopLevelAllocator(config);
+    serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+    bit = new Drillbit(config, serviceSet);
+    bit.run();
+    client = QueryTestUtil.createClient(config, serviceSet, 2, null);
+  }
+
+  @Test()
+  public void tpch01() throws Exception {
+    final String query = getFile("memory/tpch01_memory_leak.sql");
+    try {
+      QueryTestUtil.test(client, "alter session set `planner.slice_target` = 10; " + query);
+    } catch (UserRemoteException e) {
+      if (e.getMessage().contains("Attempted to close accountor")) {
+        return;
+      }
+      throw e;
+    }
+    Assert.fail("Expected UserRemoteException indicating memory leak");
+  }
+
+  private static String getFile(String resource) throws IOException {
+    URL url = Resources.getResource(resource);
+    if (url == null) {
+      throw new IOException(String.format("Unable to find path %s.", resource));
+    }
+    return Resources.toString(url, Charsets.UTF_8);
+  }
+
+  @AfterClass
+  public static void closeClient() throws Exception {
+    try {
+      allocator.close();
+      serviceSet.close();
+      bit.close();
+      client.close();
+    } catch (IllegalStateException e) {
+      e.printStackTrace();
+    }
+  }
+
+  @FunctionTemplate(name = "leakResource", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+  public static class Leak implements DrillSimpleFunc {
+
+    @Param Float8Holder in;
+    @Inject DrillBuf buf;
+    @Output Float8Holder out;
+
+    public void setup() {}
+
+    public void eval() {
+      buf.getAllocator().buffer(1);
+      out.value = in.value;
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
new file mode 100644
index 0000000..12d30ab
--- /dev/null
+++ b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
@@ -0,0 +1,22 @@
+select
+  l_returnflag,
+  l_linestatus,
+  sum(leakResource(l_quantity)) as sum_qty,
+  sum(l_extendedprice) as sum_base_price,
+  sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+  sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+  avg(l_quantity) as avg_qty,
+  avg(l_extendedprice) as avg_price,
+  avg(l_discount) as avg_disc,
+  count(*) as count_order
+from
+  cp.`tpch/lineitem.parquet`
+where
+  l_shipdate <= date '1998-12-01' - interval '120' day (3)
+group by
+  l_returnflag,
+  l_linestatus
+
+order by
+  l_returnflag,
+  l_linestatus;
\ No newline at end of file