You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cn...@apache.org on 2013/09/24 07:21:22 UTC
svn commit: r1525775 - in /hadoop/common/branches/branch-1-win: ./
src/core/org/apache/hadoop/fs/ src/mapred/org/apache/hadoop/mapred/
src/test/org/apache/hadoop/mapred/
Author: cnauroth
Date: Tue Sep 24 05:21:21 2013
New Revision: 1525775
URL: http://svn.apache.org/r1525775
Log:
MAPREDUCE-5351. JobTracker memory leak caused by CleanupQueue reopening FileSystem. Contributed by Sandy Ryza.
Added:
hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java
Modified:
hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
Modified: hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt (original)
+++ hadoop/common/branches/branch-1-win/CHANGES.branch-1-win.txt Tue Sep 24 05:21:21 2013
@@ -498,3 +498,6 @@ Branch-hadoop-1-win (branched from branc
HDFS-5211. Race condition between DistributedFileSystem#close and
FileSystem#close can cause return of a closed DistributedFileSystem instance
from the FileSystem cache. (cnauroth)
+
+ MAPREDUCE-5351. Fixed a memory leak in JobTracker due to stable FS objects in
+ FSCache. (Sandy Ryza via acmurthy)
Modified: hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java (original)
+++ hadoop/common/branches/branch-1-win/src/core/org/apache/hadoop/fs/FileSystem.java Tue Sep 24 05:21:21 2013
@@ -36,6 +36,7 @@ import java.util.concurrent.atomic.Atomi
import org.apache.commons.logging.*;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.*;
import org.apache.hadoop.util.*;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -146,6 +147,14 @@ public abstract class FileSystem extends
public static void setDefaultUri(Configuration conf, String uri) {
setDefaultUri(conf, URI.create(fixName(uri)));
}
+
+ /** Get the number of entries in the filesystem cache
+ * @return the number of entries in the filesystem cache
+ */
+ @Private
+ public static int getCacheSize() {
+ return CACHE.map.size();
+ }
/** Called after a new FileSystem instance is constructed.
* @param name a uri whose authority section names the host, port, etc.
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/CleanupQueue.java Tue Sep 24 05:21:21 2013
@@ -26,6 +26,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.security.token.DelegationTokenRenewal;
import org.apache.hadoop.security.UserGroupInformation;
@@ -105,8 +106,17 @@ public class CleanupQueue {
(ugi == null ? UserGroupInformation.getLoginUser() : ugi).doAs(
new PrivilegedExceptionAction<Object>() {
public Object run() throws IOException {
- p.getFileSystem(conf).delete(p, true);
- return null;
+ FileSystem fs = p.getFileSystem(conf);
+ try {
+ fs.delete(p, true);
+ return null;
+ } finally {
+ // So that we don't leave an entry in the FileSystem cache for
+ // every UGI that a job is submitted with.
+ if (ugi != null) {
+ fs.close();
+ }
+ }
}
});
Modified: hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1525775&r1=1525774&r2=1525775&view=diff
==============================================================================
--- hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-1-win/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Sep 24 05:21:21 2013
@@ -3308,6 +3308,7 @@ public class JobInProgress {
* removing all delegation token etc.
*/
void cleanupJob() {
+ FileSystem tempDirFs = null;
synchronized (this) {
try {
// Definitely remove the local-disk copy of the job file
@@ -3325,6 +3326,7 @@ public class JobInProgress {
if (jobTempDir != null && conf.getKeepTaskFilesPattern() == null &&
!conf.getKeepFailedTaskFiles()) {
Path jobTempDirPath = new Path(jobTempDir);
+ tempDirFs = jobTempDirPath.getFileSystem(conf);
CleanupQueue.getInstance().addToQueue(
new PathDeletionContext(jobTempDirPath, conf, userUGI, jobId));
}
@@ -3342,12 +3344,15 @@ public class JobInProgress {
this.runningReduces = null;
}
- //close the user's FS
- try {
- fs.close();
- } catch (IOException ie) {
- LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) +
- " while closing FileSystem for " + userUGI);
+ // Close the user's FS. Or don't, in the common case of FS being the same
+ // FS as the temp directory FS, as it will be closed by the CleanupQueue.
+ if (tempDirFs != fs) {
+ try {
+ fs.close();
+ } catch (IOException ie) {
+ LOG.warn("Ignoring exception " + StringUtils.stringifyException(ie) +
+ " while closing FileSystem for " + userUGI);
+ }
}
}
Added: hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java?rev=1525775&view=auto
==============================================================================
--- hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java (added)
+++ hadoop/common/branches/branch-1-win/src/test/org/apache/hadoop/mapred/TestCleanupQueue.java Tue Sep 24 05:21:21 2013
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import junit.framework.Assert;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.junit.Test;
+
+public class TestCleanupQueue {
+ @Test (timeout = 2000)
+ public void testCleanupQueueClosesFilesystem() throws IOException,
+ InterruptedException {
+ Configuration conf = new Configuration();
+ File file = new File("afile.txt");
+ file.createNewFile();
+ Path path = new Path(file.getAbsoluteFile().toURI());
+
+ FileSystem.get(conf);
+ Assert.assertEquals(1, FileSystem.getCacheSize());
+
+ // With UGI, should close FileSystem
+ CleanupQueue cleanupQueue = new CleanupQueue();
+ PathDeletionContext context = new PathDeletionContext(path, conf,
+ UserGroupInformation.getLoginUser());
+ cleanupQueue.addToQueue(context);
+
+ while (FileSystem.getCacheSize() > 0) {
+ Thread.sleep(100);
+ }
+
+ file.createNewFile();
+ FileSystem.get(conf);
+ Assert.assertEquals(1, FileSystem.getCacheSize());
+
+ // Without UGI, should not close FileSystem
+ context = new PathDeletionContext(path, conf);
+ cleanupQueue.addToQueue(context);
+
+ while (file.exists()) {
+ Thread.sleep(100);
+ }
+ Assert.assertEquals(1, FileSystem.getCacheSize());
+ }
+}