You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@drill.apache.org by ve...@apache.org on 2015/02/18 00:28:24 UTC

[1/2] drill git commit: DRILL-2251: Fix resource leaking with new DrillFileSystem

Repository: drill
Updated Branches:
  refs/heads/master d9b61fac2 -> 9c4d91d01


DRILL-2251: Fix resource leaking with new DrillFileSystem


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/ccaabdbb
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/ccaabdbb
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/ccaabdbb

Branch: refs/heads/master
Commit: ccaabdbbd4a1baa9245514d68e683d1409333bf6
Parents: d9b61fa
Author: vkorukanti <ve...@gmail.com>
Authored: Thu Feb 12 17:27:05 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Feb 17 11:20:46 2015 -0800

----------------------------------------------------------------------
 .../exec/store/dfs/DrillFSDataInputStream.java  | 131 +++++++++++++++++--
 .../drill/exec/store/dfs/DrillFileSystem.java   |  93 +++++++++++--
 .../drill/exec/store/dfs/OpenFileTracker.java   |  36 +++++
 .../exec/store/dfs/TestDrillFileSystem.java     |  25 +++-
 4 files changed, 258 insertions(+), 27 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
index 44ef8a3..5be6d10 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFSDataInputStream.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store.dfs;
 
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.ReadOption;
@@ -35,11 +36,20 @@ import java.util.EnumSet;
  * Wrapper around FSDataInputStream to collect IO Stats.
  */
 public class DrillFSDataInputStream extends FSDataInputStream {
-  private FSDataInputStream underlyingIs;
+  private final FSDataInputStream underlyingIs;
+  private final OpenFileTracker openFileTracker;
+  private final OperatorStats operatorStats;
 
   public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats) throws IOException {
+    this(in, operatorStats, null);
+  }
+
+  public DrillFSDataInputStream(FSDataInputStream in, OperatorStats operatorStats,
+      OpenFileTracker openFileTracker) throws IOException {
     super(new WrappedInputStream(in, operatorStats));
-    this.underlyingIs = in;
+    underlyingIs = in;
+    this.openFileTracker = openFileTracker;
+    this.operatorStats = operatorStats;
   }
 
   @Override
@@ -54,17 +64,32 @@ public class DrillFSDataInputStream extends FSDataInputStream {
 
   @Override
   public int read(long position, byte[] buffer, int offset, int length) throws IOException {
-    return underlyingIs.read(position, buffer, offset, length);
+    operatorStats.startWait();
+    try {
+      return underlyingIs.read(position, buffer, offset, length);
+    } finally {
+      operatorStats.stopWait();
+    }
   }
 
   @Override
   public void readFully(long position, byte[] buffer, int offset, int length) throws IOException {
-    underlyingIs.readFully(position, buffer, offset, length);
+    operatorStats.startWait();
+    try {
+      underlyingIs.readFully(position, buffer, offset, length);
+    } finally {
+      operatorStats.stopWait();
+    }
   }
 
   @Override
   public void readFully(long position, byte[] buffer) throws IOException {
-    underlyingIs.readFully(position, buffer);
+    operatorStats.startWait();
+    try {
+      underlyingIs.readFully(position, buffer);
+    } finally {
+      operatorStats.stopWait();
+    }
   }
 
   @Override
@@ -73,13 +98,19 @@ public class DrillFSDataInputStream extends FSDataInputStream {
   }
 
   @Override
+  @LimitedPrivate({"HDFS"})
   public InputStream getWrappedStream() {
     return underlyingIs.getWrappedStream();
   }
 
   @Override
   public int read(ByteBuffer buf) throws IOException {
-    return underlyingIs.read(buf);
+    operatorStats.startWait();
+    try {
+      return underlyingIs.read(buf);
+    } finally {
+      operatorStats.stopWait();
+    }
   }
 
   @Override
@@ -99,7 +130,12 @@ public class DrillFSDataInputStream extends FSDataInputStream {
 
   @Override
   public ByteBuffer read(ByteBufferPool bufferPool, int maxLength, EnumSet<ReadOption> opts) throws IOException, UnsupportedOperationException {
-    return underlyingIs.read(bufferPool, maxLength, opts);
+    operatorStats.startWait();
+    try {
+      return underlyingIs.read(bufferPool, maxLength, opts);
+    } finally {
+      operatorStats.stopWait();
+    }
   }
 
   @Override
@@ -107,6 +143,44 @@ public class DrillFSDataInputStream extends FSDataInputStream {
     underlyingIs.releaseBuffer(buffer);
   }
 
+  @Override
+  public int read() throws IOException {
+    return underlyingIs.read();
+  }
+
+  @Override
+  public long skip(long n) throws IOException {
+    return underlyingIs.skip(n);
+  }
+
+  @Override
+  public int available() throws IOException {
+    return underlyingIs.available();
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (openFileTracker != null) {
+      openFileTracker.fileClosed(this);
+    }
+    underlyingIs.close();
+  }
+
+  @Override
+  public void mark(int readlimit) {
+    underlyingIs.mark(readlimit);
+  }
+
+  @Override
+  public void reset() throws IOException {
+    underlyingIs.reset();
+  }
+
+  @Override
+  public boolean markSupported() {
+    return underlyingIs.markSupported();
+  }
+
   /**
    * We need to wrap the FSDataInputStream inside a InputStream, because read() method in InputStream is
    * overridden in FilterInputStream (super class of FSDataInputStream) as final, so we can not override in
@@ -133,14 +207,51 @@ public class DrillFSDataInputStream extends FSDataInputStream {
     @Override
     public int read(byte[] b, int off, int len) throws IOException {
       operatorStats.startWait();
-      int numBytesRead;
       try {
-        numBytesRead = is.read(b, off, len);
+        return is.read(b, off, len);
+      } finally {
+        operatorStats.stopWait();
+      }
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      operatorStats.startWait();
+      try {
+        return is.read(b);
       } finally {
         operatorStats.stopWait();
       }
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      return is.skip(n);
+    }
 
-      return numBytesRead;
+    @Override
+    public int available() throws IOException {
+      return is.available();
+    }
+
+    @Override
+    public void close() throws IOException {
+      is.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+      is.mark(readlimit);
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+      is.reset();
+    }
+
+    @Override
+    public boolean markSupported() {
+      return is.markSupported();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
index f5730a1..2683cca 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/DrillFileSystem.java
@@ -22,10 +22,13 @@ import java.io.IOException;
 import java.net.URI;
 import java.util.EnumSet;
 import java.util.List;
+import java.util.concurrent.ConcurrentMap;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.drill.exec.ops.OperatorStats;
+import org.apache.drill.exec.util.AssertionUtil;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
@@ -58,18 +61,42 @@ import org.apache.hadoop.util.Progressable;
 /**
  * DrillFileSystem is the wrapper around the actual FileSystem implementation.
  *
- * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns a instrumented FSDataInputStream to
- * measure IO wait time.
+ * If {@link org.apache.drill.exec.ops.OperatorStats} are provided it returns an instrumented FSDataInputStream to
+ * measure IO wait time and tracking file open/close operations.
  */
-public class DrillFileSystem extends FileSystem {
+public class DrillFileSystem extends FileSystem implements OpenFileTracker {
   static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DrillFileSystem.class);
+  private final static boolean TRACKING_ENABLED = AssertionUtil.isAssertionsEnabled();
+  private final static ConcurrentMap<DrillFSDataInputStream, DebugStackTrace> openedFiles = Maps.newConcurrentMap();
+
+  static {
+    if (TRACKING_ENABLED) {
+      Runtime.getRuntime().addShutdownHook(new Thread() {
+        public void run() {
+          if (openedFiles.size() != 0) {
+            final StringBuffer errMsgBuilder = new StringBuffer();
+
+            errMsgBuilder.append(String.format("Not all files opened using this FileSystem are closed. " + "There are" +
+                " still [%d] files open.\n", openedFiles.size()));
+
+            for(DebugStackTrace stackTrace : openedFiles.values()) {
+              stackTrace.addToStringBuilder(errMsgBuilder);
+            }
+
+            final String errMsg = errMsgBuilder.toString();
+            logger.error(errMsg);
+            throw new IllegalStateException(errMsg);
+          }
+        }
+      });
+    }
+  }
 
   private final FileSystem underlyingFs;
   private final OperatorStats operatorStats;
 
   public DrillFileSystem(Configuration fsConf) throws IOException {
-    this.underlyingFs = FileSystem.get(fsConf);
-    this.operatorStats = null;
+    this(FileSystem.get(fsConf), null);
   }
 
   public DrillFileSystem(FileSystem fs, OperatorStats operatorStats) {
@@ -96,11 +123,17 @@ public class DrillFileSystem extends FileSystem {
    */
   @Override
   public FSDataInputStream open(Path f, int bufferSize) throws IOException {
-    if (operatorStats != null) {
-      return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats);
+    if (operatorStats == null) {
+      return underlyingFs.open(f, bufferSize);
+    }
+
+    if (TRACKING_ENABLED) {
+      DrillFSDataInputStream is = new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats, this);
+      fileOpened(f, is);
+      return is;
     }
 
-    return underlyingFs.open(f, bufferSize);
+    return new DrillFSDataInputStream(underlyingFs.open(f, bufferSize), operatorStats);
   }
 
   /**
@@ -108,11 +141,17 @@ public class DrillFileSystem extends FileSystem {
    */
   @Override
   public FSDataInputStream open(Path f) throws IOException {
-    if (operatorStats != null) {
-      return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats);
+    if (operatorStats == null) {
+      return underlyingFs.open(f);
     }
 
-    return underlyingFs.open(f);
+    if (TRACKING_ENABLED) {
+      DrillFSDataInputStream is = new DrillFSDataInputStream(underlyingFs.open(f), operatorStats, this);
+      fileOpened(f, is);
+      return is;
+    }
+
+    return new DrillFSDataInputStream(underlyingFs.open(f), operatorStats);
   }
 
   @Override
@@ -678,5 +717,37 @@ public class DrillFileSystem extends FileSystem {
     }
   }
 
+  @Override
+  public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream) {
+    openedFiles.put(fsDataInputStream, new DebugStackTrace(path, Thread.currentThread().getStackTrace()));
+  }
 
+  @Override
+  public void fileClosed(DrillFSDataInputStream fsDataInputStream) {
+    openedFiles.remove(fsDataInputStream);
+  }
+
+  public static class DebugStackTrace {
+    final private StackTraceElement[] elements;
+    final private Path path;
+
+    public DebugStackTrace(Path path, StackTraceElement[] elements) {
+      this.path = path;
+      this.elements = elements;
+    }
+
+    public void addToStringBuilder(StringBuffer sb) {
+      sb.append("File '");
+      sb.append(path.toString());
+      sb.append("' opened at callstack:\n");
+
+      // add all stack elements except the top three as they point to DrillFileSystem.open() and inner stack elements.
+      for (int i = 3; i < elements.length; i++) {
+        sb.append("\t");
+        sb.append(elements[i]);
+        sb.append("\n");
+      }
+      sb.append("\n");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java
new file mode 100644
index 0000000..f99eb8b
--- /dev/null
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/dfs/OpenFileTracker.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.dfs;
+
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Interface to track opening and closing of files.
+ */
+public interface OpenFileTracker {
+  /**
+   * Add new file location and {@link org.apache.drill.exec.store.dfs.DrillFSDataInputStream} to list.
+   */
+  public void fileOpened(Path path, DrillFSDataInputStream fsDataInputStream);
+
+  /**
+   * Remove the given {@link org.apache.drill.exec.store.dfs.DrillFSDataInputStream} from opened file list.
+   * @param fsDataInputStream
+   */
+  public void fileClosed(DrillFSDataInputStream fsDataInputStream);
+}

http://git-wip-us.apache.org/repos/asf/drill/blob/ccaabdbb/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
index 5c71d08..ab6639e 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/exec/store/dfs/TestDrillFileSystem.java
@@ -63,23 +63,36 @@ public class TestDrillFileSystem {
 
   @Test
   public void testIOStats() throws Exception {
+    DrillFileSystem dfs = null;
+    InputStream is = null;
     Configuration conf = new Configuration();
     conf.set(FileSystem.FS_DEFAULT_NAME_KEY, "file:///");
     OpProfileDef profileDef = new OpProfileDef(0 /*operatorId*/, 0 /*operatorType*/, 0 /*inputCount*/);
     OperatorStats stats = new OperatorStats(profileDef, null /*allocator*/);
+
     // start wait time method in OperatorStats expects the OperatorStats state to be in "processing"
     stats.startProcessing();
-    DrillFileSystem dfs = new DrillFileSystem(FileSystem.get(conf), stats);
 
-    InputStream is = dfs.open(new Path(tempFilePath));
+    try {
+      dfs = new DrillFileSystem(FileSystem.get(conf), stats);
+      is = dfs.open(new Path(tempFilePath));
 
-    byte[] buf = new byte[8000];
-    while (is.read(buf, 0, buf.length) != -1) { }
+      byte[] buf = new byte[8000];
+      while (is.read(buf, 0, buf.length) != -1) {
+      }
+    } finally {
+      stats.stopProcessing();
 
-    stats.stopProcessing();
+      if (is != null) {
+        is.close();
+      }
 
-    OperatorProfile operatorProfile = stats.getProfile();
+      if (dfs != null) {
+        dfs.close();
+      }
+    }
 
+    OperatorProfile operatorProfile = stats.getProfile();
     assertTrue("Expected wait time is non-zero, but got zero wait time", operatorProfile.getWaitNanos() > 0);
   }
 


[2/2] drill git commit: DRILL-2252: Cleanup resources when fragment is cancelled.

Posted by ve...@apache.org.
DRILL-2252: Cleanup resources when fragment is cancelled.


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/9c4d91d0
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/9c4d91d0
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/9c4d91d0

Branch: refs/heads/master
Commit: 9c4d91d017ba2e7ceea89b5759d61fa73a658153
Parents: ccaabdb
Author: vkorukanti <ve...@gmail.com>
Authored: Sat Feb 14 22:18:31 2015 -0800
Committer: vkorukanti <ve...@gmail.com>
Committed: Tue Feb 17 11:50:33 2015 -0800

----------------------------------------------------------------------
 .../drill/exec/work/fragment/FragmentExecutor.java    | 14 ++++++++------
 1 file changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/9c4d91d0/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
index a8f07b5..7ccb64e 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/work/fragment/FragmentExecutor.java
@@ -47,14 +47,14 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
 
   private final AtomicInteger state = new AtomicInteger(FragmentState.AWAITING_ALLOCATION_VALUE);
   private final FragmentRoot rootOperator;
-  private RootExec root;
   private final FragmentContext context;
   private final WorkerBee bee;
   private final StatusReporter listener;
-  private Thread executionThread;
-  private AtomicBoolean closed = new AtomicBoolean(false);
   private final DrillbitStatusListener drillbitStatusListener = new FragmentDrillbitStatusListener();
 
+  private RootExec root;
+  private boolean closed;
+
   public FragmentExecutor(FragmentContext context, WorkerBee bee, FragmentRoot rootOperator, StatusReporter listener) {
     this.context = context;
     this.bee = bee;
@@ -99,7 +99,6 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
           context.getHandle().getMinorFragmentId()
           );
       Thread.currentThread().setName(newThreadName);
-      executionThread = Thread.currentThread();
 
       root = ImplCreator.getExec(context, rootOperator);
 
@@ -133,12 +132,15 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       bee.removeFragment(context.getHandle());
       context.getDrillbitContext().getClusterCoordinator().removeDrillbitStatusListener(drillbitStatusListener);
 
+      // Final check to make sure RecordBatches are cleaned up.
+      closeOutResources(false);
+
       Thread.currentThread().setName(originalThread);
     }
   }
 
   private void closeOutResources(boolean throwFailure) {
-    if (closed.get()) {
+    if (closed) {
       return;
     }
 
@@ -160,7 +162,7 @@ public class FragmentExecutor implements Runnable, CancelableQuery, StatusProvid
       logger.warn("Failure while closing out resources.", e);
     }
 
-    closed.set(true);
+    closed = true;
   }
 
   private void internalFail(Throwable excep) {