You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/09/24 07:21:22 UTC

svn commit: r1525775 - in /hadoop/common/branches/branch-1-win: ./ src/core/org/apache/hadoop/fs/ src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/

Author: cnauroth
Date: Tue Sep 24 05:21:21 2013
New Revision: 1525775

URL: http://svn.apache.org/r1525775
Log:
MAPREDUCE-5351. JobTracker memory leak caused by CleanupQueue reopening FileSystem. Contributed by Sandy Ryza.

Added:
    hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java
Modified:
    hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
    hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
    hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Tue Sep 24 05:21:21 2013
@@ -498,3 +498,6 @@ Branch-hadoop-1-win (branched from branc
     HDFS-5211. Race condition between DistributedFileSystem#close and
     FileSystem#close can cause return of a closed DistributedFileSystem instance
     from the FileSystem cache. (cnauroth)
+
+    MAPREDUCE-5351. Fixed a memory leak in JobTracker due to stable FS objects in
+    FSCache. (Sandy Ryza via acmurthy)

Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java Tue Sep 24 05:21:21 2013
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.Atomi
 
 import org.apache.commons.logging.*;
 
+import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.fs.permission.FsPermission;
@@ -146,6 +147,14 @@ public abstract class FileSystem extends
   public static void setDefaultUri(Configuration conf, String uri) {
     setDefaultUri(conf, URI.create(fixName(uri)));
   }
+  
+  /** Get the number of entries in the filesystem cache
+   * @return the number of entries in the filesystem cache
+   */
+  @Private
+  public static int getCacheSize() {
+    return CACHE.map.size();
+  }
 
   /** Called after a new FileSystem instance is constructed.
    * @param name a uri whose authority section names the host, port, etc.

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Tue Sep 24 05:21:21 2013
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -105,8 +106,17 @@ public class CleanupQueue {
       (ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
           new PrivilegedExceptionAction<Object>() {
             public Object run() throws IOException {
-             p.getFileSystem(conf).delete(p, true);
-             return null;
+              FileSystem fs = p.getFileSystem(conf);
+              try {
+                fs.delete(p, true);
+                return null;
+              } finally {
+                // So that we don't leave an entry in the FileSystem cache for
+                // every UGI that a job is submitted with.
+                if (ugi != null) {
+                  fs.close();
+                }
+              }
             }
           });
       

Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Sep 24 05:21:21 2013
@@ -3308,6 +3308,7 @@ public class JobInProgress {
    * removing all delegation token etc.
    */
   void cleanupJob() {
+    FileSystem tempDirFs = null;
     synchronized (this) {
       try {
         // Definitely remove the local-disk copy of the job file
@@ -3325,6 +3326,7 @@ public class JobInProgress {
         if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
             !conf.getKeepFailedTaskFiles()) {
           Path jobTempDirPath = new Path(jobTempDir);
+          tempDirFs = jobTempDirPath.getFileSystem(conf);
           CleanupQueue.getInstance().addToQueue(
               new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
         }
@@ -3342,12 +3344,15 @@ public class JobInProgress {
       this.runningReduces = null;
     }
     
-    //close the user's FS
-    try {
-      fs.close();
-    } catch (IOException ie) {
-      LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
-          " while closing FileSystem for " + userUGI);
+    // Close the user's FS.  Or don't, in the common case of FS being the same
+    // FS as the temp directory FS, as it will be closed by the CleanupQueue.
+    if (tempDirFs != fs) {
+      try {
+        fs.close();
+      } catch (IOException ie) {
+        LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) + 
+            " while closing FileSystem for " + userUGI);
+      }
     }
   }
 

Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java?rev=1525775&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java Tue Sep 24 05:21:21 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+public class TestCleanupQueue {
+  @Test (timeout = 2000)
+  public void testCleanupQueueClosesFilesystem() throws IOException,
+      InterruptedException {
+    Configuration conf = new Configuration();
+    File file = new File("afile.txt");
+    file.createNewFile();
+    Path path = new Path(file.getAbsoluteFile().toURI());
+    
+    FileSystem.get(conf);
+    Assert.assertEquals(1, FileSystem.getCacheSize());
+    
+    // With UGI, should close FileSystem
+    CleanupQueue cleanupQueue = new CleanupQueue();
+    PathDeletionContext context = new PathDeletionContext(path, conf,
+        UserGroupInformation.getLoginUser());
+    cleanupQueue.addToQueue(context);
+    
+    while (FileSystem.getCacheSize() > 0) {
+      Thread.sleep(100);
+    }
+    
+    file.createNewFile();
+    FileSystem.get(conf);
+    Assert.assertEquals(1, FileSystem.getCacheSize());
+    
+    // Without UGI, should not close FileSystem
+    context = new PathDeletionContext(path, conf);
+    cleanupQueue.addToQueue(context);
+    
+    while (file.exists()) {
+      Thread.sleep(100);
+    }
+    Assert.assertEquals(1, FileSystem.getCacheSize());
+  }
+}