You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by zj...@apache.org on 2014/08/15 22:17:45 UTC

svn commit: r1618269 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/ hadoop-mapreduce-...

Author: zjshen
Date: Fri Aug 15 20:17:45 2014
New Revision: 1618269

URL: http://svn.apache.org/r1618269
Log:
MAPREDUCE-6032. Made MR jobs write job history files on the default FS when the current context’s FS is different. Contributed by Benjamin Zhitomirsky.

Modified:
    hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
    hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java

Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1618269&r1=1618268&r2=1618269&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Fri Aug 15 20:17:45 2014
@@ -224,6 +224,9 @@ Release 2.6.0 - UNRELEASED
 
     MAPREDUCE-5999. Fix dead link in InputFormat javadoc (Akira AJISAKA via aw)
 
+    MAPREDUCE-6032. Made MR jobs write job history files on the default FS when
+    the current context’s FS is different. (Benjamin Zhitomirsky via zjshen)
+
 Release 2.5.0 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml?rev=1618269&r1=1618268&r2=1618269&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/pom.xml Fri Aug 15 20:17:45 2014
@@ -73,6 +73,12 @@
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-mapreduce-client-shuffle</artifactId>
     </dependency>
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-hdfs</artifactId>
+      <type>test-jar</type>
+      <scope>test</scope>
+    </dependency>
   </dependencies>
 
   <build>

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java?rev=1618269&r1=1618268&r2=1618269&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryEventHandler.java Fri Aug 15 20:17:45 2014
@@ -28,13 +28,13 @@ import java.util.Timer;
 import java.util.TimerTask;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileAlreadyExistsException;
+import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
@@ -74,7 +74,9 @@ public class JobHistoryEventHandler exte
 
   private int eventCounter;
 
-  //TODO Does the FS object need to be different ? 
+  // Those file systems may differ from the job configuration
+  // See org.apache.hadoop.mapreduce.v2.jobhistory.JobHistoryUtils
+  // #ensurePathInDefaultFileSystem
   private FileSystem stagingDirFS; // log Dir FileSystem
   private FileSystem doneDirFS; // done Dir FileSystem
 
@@ -141,7 +143,7 @@ public class JobHistoryEventHandler exte
     //Check for the existence of the history staging dir. Maybe create it. 
     try {
       stagingDirPath =
-          FileSystem.get(conf).makeQualified(new Path(stagingDirStr));
+          FileContext.getFileContext(conf).makeQualified(new Path(stagingDirStr));
       stagingDirFS = FileSystem.get(stagingDirPath.toUri(), conf);
       mkdir(stagingDirFS, stagingDirPath, new FsPermission(
           JobHistoryUtils.HISTORY_STAGING_DIR_PERMISSIONS));
@@ -154,7 +156,7 @@ public class JobHistoryEventHandler exte
     //Check for the existence of intermediate done dir.
     Path doneDirPath = null;
     try {
-      doneDirPath = FileSystem.get(conf).makeQualified(new Path(doneDirStr));
+      doneDirPath = FileContext.getFileContext(conf).makeQualified(new Path(doneDirStr));
       doneDirFS = FileSystem.get(doneDirPath.toUri(), conf);
       // This directory will be in a common location, or this may be a cluster
       // meant for a single user. Creating based on the conf. Should ideally be
@@ -194,7 +196,7 @@ public class JobHistoryEventHandler exte
     //Check/create user directory under intermediate done dir.
     try {
       doneDirPrefixPath =
-          FileSystem.get(conf).makeQualified(new Path(userDoneDirStr));
+          FileContext.getFileContext(conf).makeQualified(new Path(userDoneDirStr));
       mkdir(doneDirFS, doneDirPrefixPath, new FsPermission(
           JobHistoryUtils.HISTORY_INTERMEDIATE_USER_DIR_PERMISSIONS));
     } catch (IOException e) {

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java?rev=1618269&r1=1618268&r2=1618269&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java Fri Aug 15 20:17:45 2014
@@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.never;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 
 import org.junit.Assert;
@@ -35,8 +36,13 @@ import org.junit.Assert;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.mapreduce.Counters;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -52,6 +58,10 @@ import org.apache.hadoop.yarn.api.record
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.junit.After;
+import org.junit.AfterClass;
+import static org.junit.Assert.assertFalse;
+import org.junit.BeforeClass;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -60,6 +70,26 @@ public class TestJobHistoryEventHandler 
 
   private static final Log LOG = LogFactory
       .getLog(TestJobHistoryEventHandler.class);
+  private static MiniDFSCluster dfsCluster = null;
+  private static String coreSitePath;
+
+  @BeforeClass
+  public static void setUpClass() throws Exception {
+    coreSitePath = "." + File.separator + "target" + File.separator +
+            "test-classes" + File.separator + "core-site.xml";
+    Configuration conf = new HdfsConfiguration();
+    dfsCluster = new MiniDFSCluster.Builder(conf).build();
+  }
+
+  @AfterClass
+  public static void cleanUpClass() throws Exception {
+    dfsCluster.shutdown();
+  }
+
+  @After
+  public void cleanTest() throws Exception {
+    new File(coreSitePath).delete();
+  }
 
   @Test (timeout=50000)
   public void testFirstFlushOnCompletionEvent() throws Exception {
@@ -325,6 +355,50 @@ public class TestJobHistoryEventHandler 
     }
   }
 
+  @Test (timeout=50000)
+  public void testDefaultFsIsUsedForHistory() throws Exception {
+    // Create default configuration pointing to the minicluster
+    Configuration conf = new Configuration();
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            dfsCluster.getURI().toString());
+    FileOutputStream os = new FileOutputStream(coreSitePath);
+    conf.writeXml(os);
+    os.close();
+
+    // simulate execution under a non-default namenode
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            "file:///");
+
+    TestParams t = new TestParams();
+    conf.set(MRJobConfig.MR_AM_STAGING_DIR, t.dfsWorkDir);
+
+    JHEvenHandlerForTest realJheh =
+        new JHEvenHandlerForTest(t.mockAppContext, 0, false);
+    JHEvenHandlerForTest jheh = spy(realJheh);
+    jheh.init(conf);
+
+    try {
+      jheh.start();
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new AMStartedEvent(
+          t.appAttemptId, 200, t.containerId, "nmhost", 3000, 4000)));
+
+      handleEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent(
+          TypeConverter.fromYarn(t.jobId), 0, 0, 0, 0, 0, new Counters(),
+          new Counters(), new Counters())));
+
+      // If we got here then event handler worked but we don't know with which
+      // file system. Now we check that history stuff was written to minicluster
+      FileSystem dfsFileSystem = dfsCluster.getFileSystem();
+      assertTrue("Minicluster contains some history files",
+          dfsFileSystem.globStatus(new Path(t.dfsWorkDir + "/*")).length != 0);
+      FileSystem localFileSystem = LocalFileSystem.get(conf);
+      assertFalse("No history directory on non-default file system",
+          localFileSystem.exists(new Path(t.dfsWorkDir)));
+    } finally {
+      jheh.stop();
+    }
+  }
+
   private void queueEvent(JHEvenHandlerForTest jheh, JobHistoryEvent event) {
     jheh.handle(event);
   }
@@ -372,6 +446,7 @@ public class TestJobHistoryEventHandler 
   private class TestParams {
     boolean isLastAMRetry;
     String workDir = setupTestWorkDir();
+    String dfsWorkDir = "/" + this.getClass().getCanonicalName();
     ApplicationId appId = ApplicationId.newInstance(200, 1);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 1);
@@ -451,10 +526,16 @@ public class TestJobHistoryEventHandler 
 class JHEvenHandlerForTest extends JobHistoryEventHandler {
 
   private EventWriter eventWriter;
+  private boolean mockHistoryProcessing = true;
   public JHEvenHandlerForTest(AppContext context, int startCount) {
     super(context, startCount);
   }
 
+  public JHEvenHandlerForTest(AppContext context, int startCount, boolean mockHistoryProcessing) {
+    super(context, startCount);
+    this.mockHistoryProcessing = mockHistoryProcessing;
+  }
+
   @Override
   protected void serviceStart() {
   }
@@ -462,7 +543,12 @@ class JHEvenHandlerForTest extends JobHi
   @Override
   protected EventWriter createEventWriter(Path historyFilePath)
       throws IOException {
-    this.eventWriter = mock(EventWriter.class);
+    if (mockHistoryProcessing) {
+      this.eventWriter = mock(EventWriter.class);
+    }
+    else {
+      this.eventWriter = super.createEventWriter(historyFilePath);
+    }
     return this.eventWriter;
   }
 
@@ -475,8 +561,13 @@ class JHEvenHandlerForTest extends JobHi
   }
 
   @Override
-  protected void processDoneFiles(JobId jobId){
-    // do nothing
+  protected void processDoneFiles(JobId jobId) throws IOException {
+    if (!mockHistoryProcessing) {
+      super.processDoneFiles(jobId);
+    }
+    else {
+      // do nothing
+    }
   }
 }
 

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java?rev=1618269&r1=1618268&r2=1618269&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-common/src/main/java/org/apache/hadoop/mapreduce/v2/jobhistory/JobHistoryUtils.java Fri Aug 15 20:17:45 2014
@@ -22,20 +22,24 @@ import java.io.File;
 import java.io.IOException;
 import java.util.Calendar;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
-
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -117,6 +121,7 @@ public class JobHistoryUtils {
   public static final String TIMESTAMP_DIR_REGEX = "\\d{4}" + "\\" + Path.SEPARATOR +  "\\d{2}" + "\\" + Path.SEPARATOR + "\\d{2}";
   public static final Pattern TIMESTAMP_DIR_PATTERN = Pattern.compile(TIMESTAMP_DIR_REGEX);
   private static final String TIMESTAMP_DIR_FORMAT = "%04d" + File.separator + "%02d" + File.separator + "%02d";
+  private static final Log LOG = LogFactory.getLog(JobHistoryUtils.class);
 
   private static final PathFilter CONF_FILTER = new PathFilter() {
     @Override
@@ -183,7 +188,7 @@ public class JobHistoryUtils {
     Path stagingPath = MRApps.getStagingAreaDir(conf, user);
     Path path = new Path(stagingPath, jobId);
     String logDir = path.toString();
-    return logDir;
+    return ensurePathInDefaultFileSystem(logDir, conf);
   }
   
   /**
@@ -200,7 +205,7 @@ public class JobHistoryUtils {
           MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
           + "/history/done_intermediate";
     }
-    return doneDirPrefix;
+    return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
   }
   
   /**
@@ -216,7 +221,69 @@ public class JobHistoryUtils {
           MRJobConfig.DEFAULT_MR_AM_STAGING_DIR)
           + "/history/done";
     }
-    return doneDirPrefix;
+    return ensurePathInDefaultFileSystem(doneDirPrefix, conf);
+  }
+
+  /**
+   * Get default file system URI for the cluster (used to ensure consistency
+   * of history done/staging locations) over different context
+   *
+   * @return Default file context
+   */
+  private static FileContext getDefaultFileContext() {
+    // If FS_DEFAULT_NAME_KEY was set solely by core-default.xml then we ignore
+    // ignore it. This prevents defaulting history paths to file system specified
+    // by core-default.xml which would not make sense in any case. For a test
+    // case to exploit this functionality it should create core-site.xml
+    FileContext fc = null;
+    Configuration defaultConf = new Configuration();
+    String[] sources;
+    sources = defaultConf.getPropertySources(
+        CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY);
+    if (sources != null &&
+        (!Arrays.asList(sources).contains("core-default.xml") ||
+        sources.length > 1)) {
+      try {
+        fc = FileContext.getFileContext(defaultConf);
+        LOG.info("Default file system [" +
+                  fc.getDefaultFileSystem().getUri() + "]");
+      } catch (UnsupportedFileSystemException e) {
+        LOG.error("Unable to create default file context [" +
+            defaultConf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY) +
+            "]",
+            e);
+      }
+    }
+    else {
+      LOG.info("Default file system is set solely " +
+          "by core-default.xml therefore -  ignoring");
+    }
+
+    return fc;
+  }
+
+  /**
+   * Ensure that path belongs to cluster's default file system unless
+   * 1. it is already fully qualified.
+   * 2. current job configuration uses default file system
+   * 3. running from a test case without core-site.xml
+   *
+   * @param sourcePath source path
+   * @param conf the job configuration
+   * @return full qualified path (if necessary) in default file system
+   */
+  private static String ensurePathInDefaultFileSystem(String sourcePath, Configuration conf) {
+    Path path = new Path(sourcePath);
+    FileContext fc = getDefaultFileContext();
+    if (fc == null ||
+        fc.getDefaultFileSystem().getUri().toString().equals(
+            conf.get(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY, "")) ||
+        path.toUri().getAuthority() != null ||
+        path.toUri().getScheme()!= null) {
+      return sourcePath;
+    }
+
+    return fc.makeQualified(path).toString();
   }
 
   /**

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java?rev=1618269&r1=1618268&r2=1618269&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/test/java/org/apache/hadoop/mapreduce/v2/hs/TestHistoryFileManager.java Fri Aug 15 20:17:45 2014
@@ -19,42 +19,74 @@
 package org.apache.hadoop.mapreduce.v2.hs;
 
 
+import java.io.File;
+import java.io.FileOutputStream;
+import java.util.UUID;
 import org.junit.Assert;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
 import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
+import org.apache.hadoop.test.CoreTestDriver;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.SystemClock;
+import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
+import org.junit.Rule;
 import org.junit.Test;
-
-import java.util.UUID;
+import org.junit.rules.TestName;
 
 public class TestHistoryFileManager {
   private static MiniDFSCluster dfsCluster = null;
+  private static MiniDFSCluster dfsCluster2 = null;
+  private static String coreSitePath;
+
+  @Rule
+  public TestName name = new TestName();
 
   @BeforeClass
   public static void setUpClass() throws Exception {
+    coreSitePath = "." + File.separator + "target" + File.separator +
+            "test-classes" + File.separator + "core-site.xml";
     Configuration conf = new HdfsConfiguration();
+    Configuration conf2 = new HdfsConfiguration();
     dfsCluster = new MiniDFSCluster.Builder(conf).build();
+    conf2.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR,
+            conf.get(MiniDFSCluster.HDFS_MINIDFS_BASEDIR) + "_2");
+    dfsCluster2 = new MiniDFSCluster.Builder(conf2).build();
   }
 
   @AfterClass
   public static void cleanUpClass() throws Exception {
     dfsCluster.shutdown();
+    dfsCluster2.shutdown();
+  }
+
+  @After
+  public void cleanTest() throws Exception {
+    new File(coreSitePath).delete();
+  }
+
+  private String getDoneDirNameForTest() {
+    return "/" + name.getMethodName();
+  }
+
+  private String getIntermediateDoneDirNameForTest() {
+    return "/intermediate_" + name.getMethodName();
   }
 
   private void testTryCreateHistoryDirs(Configuration conf, boolean expected)
       throws Exception {
-    conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, "/" + UUID.randomUUID());
-    conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, "/" + UUID.randomUUID());
+    conf.set(JHAdminConfig.MR_HISTORY_DONE_DIR, getDoneDirNameForTest());
+    conf.set(JHAdminConfig.MR_HISTORY_INTERMEDIATE_DONE_DIR, getIntermediateDoneDirNameForTest());
     HistoryFileManager hfm = new HistoryFileManager();
     hfm.conf = conf;
     Assert.assertEquals(expected, hfm.tryCreatingHistoryDirs(false));
@@ -76,6 +108,36 @@ public class TestHistoryFileManager {
   }
 
   @Test
+  public void testCreateDirsWithAdditionalFileSystem() throws Exception {
+    dfsCluster.getFileSystem().setSafeMode(
+        HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    dfsCluster2.getFileSystem().setSafeMode(
+        HdfsConstants.SafeModeAction.SAFEMODE_LEAVE);
+    Assert.assertFalse(dfsCluster.getFileSystem().isInSafeMode());
+    Assert.assertFalse(dfsCluster2.getFileSystem().isInSafeMode());
+
+    // Set default configuration to the first cluster
+    Configuration conf = new Configuration(false);
+    conf.set(CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY,
+            dfsCluster.getURI().toString());
+    FileOutputStream os = new FileOutputStream(coreSitePath);
+    conf.writeXml(os);
+    os.close();
+
+    testTryCreateHistoryDirs(dfsCluster2.getConfiguration(0), true);
+
+    // Directories should be created only in the default file system (dfsCluster)
+    Assert.assertTrue(dfsCluster.getFileSystem()
+            .exists(new Path(getDoneDirNameForTest())));
+    Assert.assertTrue(dfsCluster.getFileSystem()
+            .exists(new Path(getIntermediateDoneDirNameForTest())));
+    Assert.assertFalse(dfsCluster2.getFileSystem()
+            .exists(new Path(getDoneDirNameForTest())));
+    Assert.assertFalse(dfsCluster2.getFileSystem()
+            .exists(new Path(getIntermediateDoneDirNameForTest())));
+  }
+
+  @Test
   public void testCreateDirsWithFileSystemInSafeMode() throws Exception {
     dfsCluster.getFileSystem().setSafeMode(
         HdfsConstants.SafeModeAction.SAFEMODE_ENTER);