You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/02/18 00:28:24 UTC
[1/2] drill git commit: DRILL-2251: Fix resource leaking with new
DrillFileSystem
Repository: drill
Updated Branches:
refs/heads/master d9b61fac2 -> 9c4d91d01
DRILL-2251: Fix resource leaking with new DrillFileSystem
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ccaabdbb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ccaabdbb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ccaabdbb
Branch: refs/heads/master
Commit: ccaabdbbd4a1baa9245514d68e683d1409333bf6
Parents: d9b61fa
Author: vkorukanti <ve...@gmail.com>
Authored: Thu Feb 12 17:27:05 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Feb 17 11:20:46 2015 -0800
----------------------------------------------------------------------
.../exec/store/dfs/DrillFSDataInputStream.java | 131 +++++++++++++++++--
.../drill/exec/store/dfs/DrillFileSystem.java | 93 +++++++++++--
.../drill/exec/store/dfs/OpenFileTracker.java | 36 +++++
.../exec/store/dfs/TestDrillFileSystem.java | 25 +++-
4 files changed, 258 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
index 44ef8a3..5be6d10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -18,6 +18,7 @@
package org.apache.drill.exec.store.dfs;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.PositionedReadable;
import org.apache.hadoop.fs.ReadOption;
@@ -35,11 +36,20 @@ import java.util.EnumSet;
* Wrapper around FSDataInputStream to collect IO Stats.
*/
public class DrillFSDataInputStream extends FSDataInputStream {
- private FSDataInputStream underlyingIs;
+ private final FSDataInputStream underlyingIs;
+ private final OpenFileTracker openFileTracker;
+ private final OperatorStats operatorStats;
public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException {
+ this(in, operatorStats, null);
+ }
+
+ public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats,
+ OpenFileTracker openFileTracker) throws IOException {
super(new WrappedInputStream(in, operatorStats));
- this.underlyingIs = in;
+ underlyingIs = in;
+ this.openFileTracker = openFileTracker;
+ this.operatorStats = operatorStats;
}
@Override
@@ -54,17 +64,32 @@ public class DrillFSDataInputStream extends FSDataInputStream {
@Override
public int read(long position, byte[] buffer, int offset, int length) throws IOException {
- return underlyingIs.read(position, buffer, offset, length);
+ operatorStats.startWait();
+ try {
+ return underlyingIs.read(position, buffer, offset, length);
+ } finally {
+ operatorStats.stopWait();
+ }
}
@Override
public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
- underlyingIs.readFully(position, buffer, offset, length);
+ operatorStats.startWait();
+ try {
+ underlyingIs.readFully(position, buffer, offset, length);
+ } finally {
+ operatorStats.stopWait();
+ }
}
@Override
public void readFully(long position, byte[] buffer) throws IOException {
- underlyingIs.readFully(position, buffer);
+ operatorStats.startWait();
+ try {
+ underlyingIs.readFully(position, buffer);
+ } finally {
+ operatorStats.stopWait();
+ }
}
@Override
@@ -73,13 +98,19 @@ public class DrillFSDataInputStream extends FSDataInputStream {
}
@Override
+ @LimitedPrivate({"HDFS"})
public InputStream getWrappedStream() {
return underlyingIs.getWrappedStream();
}
@Override
public int read(ByteBuffer buf) throws IOException {
- return underlyingIs.read(buf);
+ operatorStats.startWait();
+ try {
+ return underlyingIs.read(buf);
+ } finally {
+ operatorStats.stopWait();
+ }
}
@Override
@@ -99,7 +130,12 @@ public class DrillFSDataInputStream extends FSDataInputStream {
@Override
public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
- return underlyingIs.read(bufferPool, maxLength, opts);
+ operatorStats.startWait();
+ try {
+ return underlyingIs.read(bufferPool, maxLength, opts);
+ } finally {
+ operatorStats.stopWait();
+ }
}
@Override
@@ -107,6 +143,44 @@ public class DrillFSDataInputStream extends FSDataInputStream {
underlyingIs.releaseBuffer(buffer);
}
+ @Override
+ public int read() throws IOException {
+ return underlyingIs.read();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return underlyingIs.skip(n);
+ }
+
+ @Override
+ public int available() throws IOException {
+ return underlyingIs.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (openFileTracker != null) {
+ openFileTracker.fileClosed(this);
+ }
+ underlyingIs.close();
+ }
+
+ @Override
+ public void mark(int readlimit) {
+ underlyingIs.mark(readlimit);
+ }
+
+ @Override
+ public void reset() throws IOException {
+ underlyingIs.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return underlyingIs.markSupported();
+ }
+
/**
* We need to wrap the FSDataInputStream inside a InputStream, because read() method in InputStream is
* overridden in FilterInputStream (super class of FSDataInputStream) as final, so we can not override in
@@ -133,14 +207,51 @@ public class DrillFSDataInputStream extends FSDataInputStream {
@Override
public int read(byte[] b, int off, int len) throws IOException {
operatorStats.startWait();
- int numBytesRead;
try {
- numBytesRead = is.read(b, off, len);
+ return is.read(b, off, len);
+ } finally {
+ operatorStats.stopWait();
+ }
+ }
+
+ @Override
+ public int read(byte[] b) throws IOException {
+ operatorStats.startWait();
+ try {
+ return is.read(b);
} finally {
operatorStats.stopWait();
}
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return is.skip(n);
+ }
- return numBytesRead;
+ @Override
+ public int available() throws IOException {
+ return is.available();
+ }
+
+ @Override
+ public void close() throws IOException {
+ is.close();
+ }
+
+ @Override
+ public synchronized void mark(int readlimit) {
+ is.mark(readlimit);
+ }
+
+ @Override
+ public synchronized void reset() throws IOException {
+ is.reset();
+ }
+
+ @Override
+ public boolean markSupported() {
+ return is.markSupported();
}
@Override
http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index f5730a1..2683cca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -22,10 +22,13 @@ import java.io.IOException;
import java.net.URI;
import java.util.EnumSet;
import java.util.List;
+import java.util.concurrent.ConcurrentMap;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.util.AssertionUtil;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
@@ -58,18 +61,42 @@ import org.apache.hadoop.util.Progressable;
/**
* DrillFileSystem is the wrapper around the actual FileSystem implementation.
*
- * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns a instrumented FSDataInputStream to
- * measure IO wait time.
+ * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns an instrumented FSDataInputStream to
+ * measure IO wait time and tracking file open/close operations.
*/
-public class DrillFileSystem extends FileSystem {
+public class DrillFileSystem extends FileSystem implements OpenFileTracker {
static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
+ private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
+ private final static ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
+
+ static {
+ if (TRACKING_ENABLED) {
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ if (openedFiles.size() != 0) {
+ final StringBuffer errMsgBuilder = new StringBuffer();
+
+ errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
+ " still [%d] files open.\n", openedFiles.size()));
+
+ for(DebugStackTrace stackTrace : openedFiles.values()) {
+ stackTrace.addToStringBuilder(errMsgBuilder);
+ }
+
+ final String errMsg = errMsgBuilder.toString();
+ logger.error(errMsg);
+ throw new IllegalStateException(errMsg);
+ }
+ }
+ });
+ }
+ }
private final FileSystem underlyingFs;
private final OperatorStats operatorStats;
public DrillFileSystem(Configuration fsConf) throws IOException {
- this.underlyingFs = FileSystem.get(fsConf);
- this.operatorStats = null;
+ this(FileSystem.get(fsConf), null);
}
public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
@@ -96,11 +123,17 @@ public class DrillFileSystem extends FileSystem {
*/
@Override
public FSDataInputStream open(Path f, int bufferSize) throws IOException {
- if (operatorStats != null) {
- return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats);
+ if (operatorStats == null) {
+ return underlyingFs.open(f, bufferSize);
+ }
+
+ if (TRACKING_ENABLED) {
+ DrillFSDataInputStream is = new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats, this);
+ fileOpened(f, is);
+ return is;
}
- return underlyingFs.open(f, bufferSize);
+ return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats);
}
/**
@@ -108,11 +141,17 @@ public class DrillFileSystem extends FileSystem {
*/
@Override
public FSDataInputStream open(Path f) throws IOException {
- if (operatorStats != null) {
- return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats);
+ if (operatorStats == null) {
+ return underlyingFs.open(f);
}
- return underlyingFs.open(f);
+ if (TRACKING_ENABLED) {
+ DrillFSDataInputStream is = new DrillFSDataInputStream(underlyingFs.open(f), operatorStats, this);
+ fileOpened(f, is);
+ return is;
+ }
+
+ return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats);
}
@Override
@@ -678,5 +717,37 @@ public class DrillFileSystem extends FileSystem {
}
}
+ @Override
+ public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream) {
+ openedFiles.put(fsDataInputStream, new DebugStackTrace(path, Thread.currentThread().getStackTrace()));
+ }
+ @Override
+ public void fileClosed(DrillFSDataInputStream fsDataInputStream) {
+ openedFiles.remove(fsDataInputStream);
+ }
+
+ public static class DebugStackTrace {
+ final private StackTraceElement[] elements;
+ final private Path path;
+
+ public DebugStackTrace(Path path, StackTraceElement[] elements) {
+ this.path = path;
+ this.elements = elements;
+ }
+
+ public void addToStringBuilder(StringBuffer sb) {
+ sb.append("File '");
+ sb.append(path.toString());
+ sb.append("' opened at callstack:\n");
+
+ // add all stack elements except the top three as they point to DrillFileSystem.open() and inner stack elements.
+ for (int i = 3; i < elements.length; i++) {
+ sb.append("\t");
+ sb.append(elements[i]);
+ sb.append("\n");
+ }
+ sb.append("\n");
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java
new file mode 100644
index 0000000..f99eb8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Interface to track opening and closing of files.
+ */
+public interface OpenFileTracker {
+ /**
+ * Add new file location and {@link org.apache.drill.exec.store.dfs.DrillFSDataInputStream} to list.
+ */
+ public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream);
+
+ /**
+ * Remove the given {@link org.apache.drill.exec.store.dfs.DrillFSDataInputStream} from opened file list.
+ * @param fsDataInputStream
+ */
+ public void fileClosed(DrillFSDataInputStream fsDataInputStream);
+}
http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index 5c71d08..ab6639e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -63,23 +63,36 @@ public class TestDrillFileSystem {
@Test
public void testIOStats() throws Exception {
+ DrillFileSystem dfs = null;
+ InputStream is = null;
Configuration conf = new Configuration();
conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/);
OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/);
+
// start wait time method in OperatorStats expects the OperatorStats state to be in "processing"
stats.startProcessing();
- DrillFileSystem dfs = new DrillFileSystem(FileSystem.get(conf), stats);
- InputStream is = dfs.open(new Path(tempFilePath));
+ try {
+ dfs = new DrillFileSystem(FileSystem.get(conf), stats);
+ is = dfs.open(new Path(tempFilePath));
- byte[] buf = new byte[8000];
- while (is.read(buf, 0, buf.length) != -1) { }
+ byte[] buf = new byte[8000];
+ while (is.read(buf, 0, buf.length) != -1) {
+ }
+ } finally {
+ stats.stopProcessing();
- stats.stopProcessing();
+ if (is != null) {
+ is.close();
+ }
- OperatorProfile operatorProfile = stats.getProfile();
+ if (dfs != null) {
+ dfs.close();
+ }
+ }
+ OperatorProfile operatorProfile = stats.getProfile();
assertTrue("Expected wait time is non-zero, but got zero wait time", operatorProfile.getWaitNanos() > 0);
}
[2/2] drill git commit: DRILL-2252: Cleanup resources when fragment
is cancelled.
Posted by ve...@apache.org.
DRILL-2252: Cleanup resources when fragment is cancelled.
Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9c4d91d0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9c4d91d0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9c4d91d0
Branch: refs/heads/master
Commit: 9c4d91d017ba2e7ceea89b5759d61fa73a658153
Parents: ccaabdb
Author: vkorukanti <ve...@gmail.com>
Authored: Sat Feb 14 22:18:31 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Feb 17 11:50:33 2015 -0800
----------------------------------------------------------------------
.../drill/exec/work/fragment/FragmentExecutor.java | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/drill/blob/9c4d91d0/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a8f07b5..7ccb64e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,14 +47,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
private final FragmentRoot rootOperator;
- private RootExec root;
private final FragmentContext context;
private final WorkerBee bee;
private final StatusReporter listener;
- private Thread executionThread;
- private AtomicBoolean closed = new AtomicBoolean(false);
private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
+ private RootExec root;
+ private boolean closed;
+
public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) {
this.context = context;
this.bee = bee;
@@ -99,7 +99,6 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
context.getHandle().getMinorFragmentId()
);
Thread.currentThread().setName(newThreadName);
- executionThread = Thread.currentThread();
root = ImplCreator.getExec(context, rootOperator);
@@ -133,12 +132,15 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
bee.removeFragment(context.getHandle());
context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener);
+ // Final check to make sure RecordBatches are cleaned up.
+ closeOutResources(false);
+
Thread.currentThread().setName(originalThread);
}
}
private void closeOutResources(boolean throwFailure) {
- if (closed.get()) {
+ if (closed) {
return;
}
@@ -160,7 +162,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
logger.warn("Failure while closing out resources.", e);
}
- closed.set(true);
+ closed = true;
}
private void internalFail(Throwable excep) {