You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by sm...@apache.org on 2015/05/13 04:42:52 UTC
[3/6] drill git commit: DRILL-2780: Check for open files when closing
OperatorContext
DRILL-2780: Check for open files when closing OperatorContext
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ed200e25
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ed200e25
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ed200e25
Branch: refs/heads/master
Commit: ed200e257249e67176a259bab0d607841bf37fa4
Parents: 6d7cda8
Author: Steven Phillips <sm...@apache.org>
Authored: Sun May 10 02:33:14 2015 -0700
Committer: Steven Phillips <sm...@apache.org>
Committed: Tue May 12 18:21:10 2015 -0700
----------------------------------------------------------------------
.../apache/drill/exec/ops/OperatorContext.java | 5 +
.../drill/exec/ops/OperatorContextImpl.java | 22 ++++
.../drill/exec/store/dfs/DrillFileSystem.java | 42 +++----
.../exec/store/dfs/easy/EasyFormatPlugin.java | 2 +-
.../store/parquet/ParquetScanBatchCreator.java | 2 +-
.../exec/work/fragment/FragmentExecutor.java | 2 +-
.../drill/exec/testing/TestResourceLeak.java | 126 +++++++++++++++++++
.../resources/memory/tpch01_memory_leak.sql | 22 ++++
8 files changed, 195 insertions(+), 28 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
index 35139d5..7eb7d8a 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java
@@ -19,11 +19,14 @@ package org.apache.drill.exec.ops;
import io.netty.buffer.DrillBuf;
+import java.io.IOException;
import java.util.Iterator;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
public abstract class OperatorContext {
@@ -39,6 +42,8 @@ public abstract class OperatorContext {
public abstract ExecutionControls getExecutionControls();
+ public abstract DrillFileSystem newFileSystem(Configuration conf) throws IOException;
+
public static int getChildCount(PhysicalOperator popConfig) {
Iterator<PhysicalOperator> iter = popConfig.iterator();
int i = 0;
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
index 9fa8867..ce9f351 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContextImpl.java
@@ -17,14 +17,20 @@
*/
package org.apache.drill.exec.ops;
+import com.google.common.base.Preconditions;
import io.netty.buffer.DrillBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.exec.memory.BufferAllocator;
import org.apache.drill.exec.memory.OutOfMemoryException;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import com.carrotsearch.hppc.LongObjectOpenHashMap;
import org.apache.drill.exec.testing.ExecutionControls;
+import org.apache.drill.exec.store.dfs.DrillFileSystem;
+import org.apache.hadoop.conf.Configuration;
+
+import java.io.IOException;
class OperatorContextImpl extends OperatorContext implements AutoCloseable {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContextImpl.class);
@@ -36,6 +42,7 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
private OperatorStats stats;
private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>();
private final boolean applyFragmentLimit;
+ private DrillFileSystem fs;
public OperatorContextImpl(PhysicalOperator popConfig, FragmentContext context, boolean applyFragmentLimit) throws OutOfMemoryException {
this.applyFragmentLimit=applyFragmentLimit;
@@ -108,6 +115,14 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
if (allocator != null) {
allocator.close();
}
+
+ if (fs != null) {
+ try {
+ fs.close();
+ } catch (IOException e) {
+ throw new DrillRuntimeException(e);
+ }
+ }
closed = true;
}
@@ -115,4 +130,11 @@ class OperatorContextImpl extends OperatorContext implements AutoCloseable {
return stats;
}
+ @Override
+ public DrillFileSystem newFileSystem(Configuration conf) throws IOException {
+ Preconditions.checkState(fs == null, "Tried to create a second FileSystem. Can only be called once per OperatorContext");
+ fs = new DrillFileSystem(conf, getStats());
+ return fs;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index b6a9c30..25dd811 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -71,30 +71,7 @@ import com.google.common.collect.Maps;
public class DrillFileSystem extends FileSystem implements OpenFileTracker {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
- private final static ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
-
- static {
- if (TRACKING_ENABLED) {
- Runtime.getRuntime().addShutdownHook(new Thread() {
- public void run() {
- if (openedFiles.size() != 0) {
- final StringBuffer errMsgBuilder = new StringBuffer();
-
- errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
- " still [%d] files open.\n", openedFiles.size()));
-
- for(DebugStackTrace stackTrace : openedFiles.values()) {
- stackTrace.addToStringBuilder(errMsgBuilder);
- }
-
- final String errMsg = errMsgBuilder.toString();
- logger.error(errMsg);
- throw new IllegalStateException(errMsg);
- }
- }
- });
- }
- }
+ private final ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
private final FileSystem underlyingFs;
private final OperatorStats operatorStats;
@@ -420,7 +397,22 @@ public class DrillFileSystem extends FileSystem implements OpenFileTracker {
@Override
public void close() throws IOException {
- underlyingFs.close();
+ if (TRACKING_ENABLED) {
+ if (openedFiles.size() != 0) {
+ final StringBuffer errMsgBuilder = new StringBuffer();
+
+ errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
+ " still [%d] files open.\n", openedFiles.size()));
+
+ for (DebugStackTrace stackTrace : openedFiles.values()) {
+ stackTrace.addToStringBuilder(errMsgBuilder);
+ }
+
+ final String errMsg = errMsgBuilder.toString();
+ logger.error(errMsg);
+ throw new IllegalStateException(errMsg);
+ }
+ }
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
index d1b46e1..233c32b 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/easy/EasyFormatPlugin.java
@@ -162,7 +162,7 @@ public abstract class EasyFormatPlugin<T extends FormatPluginConfig> implements
*/);
final DrillFileSystem dfs;
try {
- dfs = new DrillFileSystem(fsConf, oContext.getStats());
+ dfs = oContext.newFileSystem(fsConf);
} catch (IOException e) {
throw new ExecutionSetupException(String.format("Failed to create FileSystem: %s", e.getMessage()), e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 5e9c4ca..4423fde 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -100,7 +100,7 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan
DrillFileSystem fs;
try {
- fs = new DrillFileSystem(rowGroupScan.getStorageEngine().getFsConf(), oContext.getStats());
+ fs = oContext.newFileSystem(rowGroupScan.getStorageEngine().getFsConf());
} catch(IOException e) {
throw new ExecutionSetupException(String.format("Failed to create DrillFileSystem: %s", e.getMessage()), e);
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index d96e6d6..2712fe7 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -228,7 +228,6 @@ public class FragmentExecutor implements Runnable {
}
});
- updateState(FragmentState.FINISHED);
} catch (OutOfMemoryError | OutOfMemoryRuntimeException e) {
if (!(e instanceof OutOfMemoryError) || "Direct buffer memory".equals(e.getMessage())) {
fail(UserException.memoryError(e).build());
@@ -249,6 +248,7 @@ public class FragmentExecutor implements Runnable {
closeOutResources();
+ updateState(FragmentState.FINISHED);
// send the final state of the fragment. only the main execution thread can send the final state and it can
// only be sent once.
sendFinalState();
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
new file mode 100644
index 0000000..d7e317c
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/testing/TestResourceLeak.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.testing;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Resources;
+import io.netty.buffer.DrillBuf;
+import org.apache.drill.QueryTestUtil;
+import org.apache.drill.common.config.DrillConfig;
+import org.apache.drill.common.exceptions.UserRemoteException;
+import org.apache.drill.exec.ExecConstants;
+import org.apache.drill.exec.client.DrillClient;
+import org.apache.drill.exec.expr.DrillSimpleFunc;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope;
+import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
+import org.apache.drill.exec.expr.annotations.Output;
+import org.apache.drill.exec.expr.annotations.Param;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.memory.BufferAllocator;
+import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.server.Drillbit;
+import org.apache.drill.exec.server.RemoteServiceSet;
+import org.apache.drill.test.DrillTest;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import javax.inject.Inject;
+import java.io.IOException;
+import java.net.URL;
+import java.util.Properties;
+
+public class TestResourceLeak extends DrillTest {
+
+ private static DrillClient client;
+ private static Drillbit bit;
+ private static RemoteServiceSet serviceSet;
+ private static DrillConfig config;
+ private static BufferAllocator allocator;
+
+ @SuppressWarnings("serial")
+ private static final Properties TEST_CONFIGURATIONS = new Properties() {
+ {
+ put(ExecConstants.SYS_STORE_PROVIDER_LOCAL_ENABLE_WRITE, "false");
+ put(ExecConstants.HTTP_ENABLE, "false");
+ }
+ };
+
+ @BeforeClass
+ public static void openClient() throws Exception {
+ config = DrillConfig.create(TEST_CONFIGURATIONS);
+ allocator = new TopLevelAllocator(config);
+ serviceSet = RemoteServiceSet.getLocalServiceSet();
+
+ bit = new Drillbit(config, serviceSet);
+ bit.run();
+ client = QueryTestUtil.createClient(config, serviceSet, 2, null);
+ }
+
+ @Test()
+ public void tpch01() throws Exception {
+ final String query = getFile("memory/tpch01_memory_leak.sql");
+ try {
+ QueryTestUtil.test(client, "alter session set `planner.slice_target` = 10; " + query);
+ } catch (UserRemoteException e) {
+ if (e.getMessage().contains("Attempted to close accountor")) {
+ return;
+ }
+ throw e;
+ }
+ Assert.fail("Expected UserRemoteException indicating memory leak");
+ }
+
+ private static String getFile(String resource) throws IOException {
+ URL url = Resources.getResource(resource);
+ if (url == null) {
+ throw new IOException(String.format("Unable to find path %s.", resource));
+ }
+ return Resources.toString(url, Charsets.UTF_8);
+ }
+
+ @AfterClass
+ public static void closeClient() throws Exception {
+ try {
+ allocator.close();
+ serviceSet.close();
+ bit.close();
+ client.close();
+ } catch (IllegalStateException e) {
+ e.printStackTrace();
+ }
+ }
+
+ @FunctionTemplate(name = "leakResource", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL)
+ public static class Leak implements DrillSimpleFunc {
+
+ @Param Float8Holder in;
+ @Inject DrillBuf buf;
+ @Output Float8Holder out;
+
+ public void setup() {}
+
+ public void eval() {
+ buf.getAllocator().buffer(1);
+ out.value = in.value;
+ }
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/ed200e25/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
new file mode 100644
index 0000000..12d30ab
--- /dev/null
+++ b/exec/java-exec/src/test/resources/memory/tpch01_memory_leak.sql
@@ -0,0 +1,22 @@
+select
+ l_returnflag,
+ l_linestatus,
+ sum(leakResource(l_quantity)) as sum_qty,
+ sum(l_extendedprice) as sum_base_price,
+ sum(l_extendedprice * (1 - l_discount)) as sum_disc_price,
+ sum(l_extendedprice * (1 - l_discount) * (1 + l_tax)) as sum_charge,
+ avg(l_quantity) as avg_qty,
+ avg(l_extendedprice) as avg_price,
+ avg(l_discount) as avg_disc,
+ count(*) as count_order
+from
+ cp.`tpch/lineitem.parquet`
+where
+ l_shipdate <= date '1998-12-01' - interval '120' day (3)
+group by
+ l_returnflag,
+ l_linestatus
+
+order by
+ l_returnflag,
+ l_linestatus;
\ No newline at end of file