You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2020/06/27 23:04:50 UTC

[hadoop] branch branch-3.1 updated: HDFS-15306. Make mount-table to read from central place ( Let's say from HDFS). Contributed by Uma Maheswara Rao G.

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.1 by this push:
     new c4857bb  HDFS-15306. Make mount-table to read from central place ( Let's say from HDFS). Contributed by Uma Maheswara Rao G.
c4857bb is described below

commit c4857bb9c1bd5c937cdc658192172d09e56609bc
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Thu May 14 17:29:35 2020 -0700

    HDFS-15306. Make mount-table to read from central place ( Let's say from HDFS). Contributed by Uma Maheswara Rao G.
    
    (cherry picked from commit ac4a2e11d98827c7926a34cda27aa7bcfd3f36c1)
    (cherry picked from commit 544996c85702af7ae241ef2f18e2597e2b4050be)
---
 .../org/apache/hadoop/fs/viewfs/Constants.java     |   5 +
 .../fs/viewfs/HCFSMountTableConfigLoader.java      | 122 ++++++++++++++
 .../hadoop/fs/viewfs/MountTableConfigLoader.java   |  44 +++++
 .../fs/viewfs/ViewFileSystemOverloadScheme.java    | 180 ++++++++++++---------
 .../org/apache/hadoop/fs/viewfs/package-info.java  |  26 +++
 .../fs/viewfs/TestHCFSMountTableConfigLoader.java  | 165 +++++++++++++++++++
 ...iewFSOverloadSchemeCentralMountTableConfig.java |  77 +++++++++
 ...iewFileSystemOverloadSchemeLocalFileSystem.java |  47 ++++--
 .../apache/hadoop/fs/viewfs/ViewFsTestSetup.java   |  71 +++++++-
 ...FSOverloadSchemeWithMountTableConfigInHDFS.java |  68 ++++++++
 ...ViewFileSystemOverloadSchemeWithHdfsScheme.java | 125 +++++++++-----
 11 files changed, 797 insertions(+), 133 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
index 37f1a16..0a5d4b4 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/Constants.java
@@ -30,6 +30,11 @@ public interface Constants {
    * Prefix for the config variable prefix for the ViewFs mount-table
    */
   public static final String CONFIG_VIEWFS_PREFIX = "fs.viewfs.mounttable";
+
+  /**
+   * Prefix for the config variable for the ViewFs mount-table path.
+   */
+  String CONFIG_VIEWFS_MOUNTTABLE_PATH = CONFIG_VIEWFS_PREFIX + ".path";
  
   /**
    * Prefix for the home dir for the mount table - if not specified
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java
new file mode 100644
index 0000000..c7e5aab
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/HCFSMountTableConfigLoader.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.LocatedFileStatus;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An implementation for Apache Hadoop compatible file system based mount-table
+ * file loading.
+ */
+public class HCFSMountTableConfigLoader implements MountTableConfigLoader {
+  private static final String REGEX_DOT = "[.]";
+  private static final Logger LOGGER =
+      LoggerFactory.getLogger(HCFSMountTableConfigLoader.class);
+  private Path mountTable = null;
+
+  /**
+   * Loads the mount-table configuration from hadoop compatible file system and
+   * add the configuration items to given configuration. Mount-table
+   * configuration format should be suffixed with version number.
+   * Format: mount-table.<versionNumber>.xml
+   * Example: mount-table.1.xml
+   * When user wants to update mount-table, the expectation is to upload new
+   * mount-table configuration file with monotonically increasing integer as
+   * version number. This API loads the highest version number file. We can
+   * also configure single file path directly.
+   *
+   * @param mountTableConfigPath : A directory path where mount-table files
+   *          stored or a mount-table file path. We recommend to configure
+   *          directory with the mount-table version files.
+   * @param conf : to add the mount table as resource.
+   */
+  @Override
+  public void load(String mountTableConfigPath, Configuration conf)
+      throws IOException {
+    this.mountTable = new Path(mountTableConfigPath);
+    String scheme = mountTable.toUri().getScheme();
+    ViewFileSystem.FsGetter fsGetter =
+        new ViewFileSystemOverloadScheme.ChildFsGetter(scheme);
+    try (FileSystem fs = fsGetter.getNewInstance(mountTable.toUri(), conf)) {
+      RemoteIterator<LocatedFileStatus> listFiles =
+          fs.listFiles(mountTable, false);
+      LocatedFileStatus lfs = null;
+      int higherVersion = -1;
+      while (listFiles.hasNext()) {
+        LocatedFileStatus curLfs = listFiles.next();
+        String cur = curLfs.getPath().getName();
+        String[] nameParts = cur.split(REGEX_DOT);
+        if (nameParts.length < 2) {
+          logInvalidFileNameFormat(cur);
+          continue; // invalid file name
+        }
+        int curVersion = higherVersion;
+        try {
+          curVersion = Integer.parseInt(nameParts[nameParts.length - 2]);
+        } catch (NumberFormatException nfe) {
+          logInvalidFileNameFormat(cur);
+          continue;
+        }
+
+        if (curVersion > higherVersion) {
+          higherVersion = curVersion;
+          lfs = curLfs;
+        }
+      }
+
+      if (lfs == null) {
+        // No valid mount table file found.
+        // TODO: Should we fail? Currently viewfs init will fail if no mount
+        // links anyway.
+        LOGGER.warn("No valid mount-table file exist at: {}. At least one "
+            + "mount-table file should present with the name format: "
+            + "mount-table.<versionNumber>.xml", mountTableConfigPath);
+        return;
+      }
+      // Latest version file.
+      Path latestVersionMountTable = lfs.getPath();
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Loading the mount-table {} into configuration.",
+            latestVersionMountTable);
+      }
+      try (FSDataInputStream open = fs.open(latestVersionMountTable)) {
+        Configuration newConf = new Configuration(false);
+        newConf.addResource(open);
+        // This will add configuration props as resource, instead of stream
+        // itself. So, that stream can be closed now.
+        conf.addResource(newConf);
+      }
+    }
+  }
+
+  private void logInvalidFileNameFormat(String cur) {
+    LOGGER.warn("Invalid file name format for mount-table version file: {}. "
+        + "The valid file name format is mount-table-name.<versionNumber>.xml",
+        cur);
+  }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/MountTableConfigLoader.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/MountTableConfigLoader.java
new file mode 100644
index 0000000..bc2c3ea
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/MountTableConfigLoader.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.IOException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * An interface for loading mount-table configuration. This class can have more
+ * APIs like refreshing mount tables automatically etc.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public interface MountTableConfigLoader {
+
+  /**
+   * Loads the mount-table configuration into given configuration.
+   *
+   * @param mountTableConfigPath - Path of the mount table. It can be a file or
+   *          a directory in the case of multiple versions of mount-table
+   *          files(Recommended option).
+   * @param conf - Configuration object to add mount table.
+   */
+  void load(String mountTableConfigPath, Configuration conf)
+      throws IOException;
+}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java
index 2dda540..f5952d5 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java
@@ -98,107 +98,133 @@ public class ViewFileSystemOverloadScheme extends ViewFileSystem {
 
   @Override
   public void initialize(URI theUri, Configuration conf) throws IOException {
+    this.myUri = theUri;
     if (LOG.isDebugEnabled()) {
       LOG.debug("Initializing the ViewFileSystemOverloadScheme with the uri: "
           + theUri);
     }
-    this.myUri = theUri;
+    String mountTableConfigPath =
+        conf.get(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH);
+    if (null != mountTableConfigPath) {
+      MountTableConfigLoader loader = new HCFSMountTableConfigLoader();
+      loader.load(mountTableConfigPath, conf);
+    } else {
+      // TODO: Should we fail here.?
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Missing configuration for fs.viewfs.mounttable.path. Proceeding"
+                + "with core-site.xml mount-table information if avaialable.");
+      }
+    }
     super.initialize(theUri, conf);
   }
 
   /**
    * This method is overridden because in ViewFileSystemOverloadScheme if
-   * overloaded cheme matches with mounted target fs scheme, file system should
-   * be created without going into fs.<scheme>.impl based resolution. Otherwise
-   * it will end up in an infinite loop as the target will be resolved again
-   * to ViewFileSystemOverloadScheme as fs.<scheme>.impl points to
-   * ViewFileSystemOverloadScheme. So, below method will initialize the
+   * overloaded scheme matches with mounted target fs scheme, file system
+   * should be created without going into fs.<scheme>.impl based resolution.
+   * Otherwise it will end up in an infinite loop as the target will be
+   * resolved again to ViewFileSystemOverloadScheme as fs.<scheme>.impl points
+   * to ViewFileSystemOverloadScheme. So, below method will initialize the
    * fs.viewfs.overload.scheme.target.<scheme>.impl. Other schemes can
    * follow fs.newInstance
    */
   @Override
   protected FsGetter fsGetter() {
+    return new ChildFsGetter(getScheme());
+  }
 
-    return new FsGetter() {
-      @Override
-      public FileSystem getNewInstance(URI uri, Configuration conf)
-          throws IOException {
-        if (uri.getScheme().equals(getScheme())) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "The file system initialized uri scheme is matching with the "
-                    + "given target uri scheme. The target uri is: " + uri);
-          }
-          /*
-           * Avoid looping when target fs scheme is matching to overloaded
-           * scheme.
-           */
-          return createFileSystem(uri, conf);
-        } else {
-          return FileSystem.newInstance(uri, conf);
+  /**
+   * This class checks whether the rooScheme is same as URI scheme. If both are
+   * same, then it will initialize file systems by using the configured
+   * fs.viewfs.overload.scheme.target.<scheme>.impl class.
+   */
+  static class ChildFsGetter extends FsGetter {
+
+    private final String rootScheme;
+
+    ChildFsGetter(String rootScheme) {
+      this.rootScheme = rootScheme;
+    }
+
+    @Override
+    public FileSystem getNewInstance(URI uri, Configuration conf)
+        throws IOException {
+      if (uri.getScheme().equals(this.rootScheme)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "The file system initialized uri scheme is matching with the "
+                  + "given target uri scheme. The target uri is: " + uri);
         }
+        /*
+         * Avoid looping when target fs scheme is matching to overloaded scheme.
+         */
+        return createFileSystem(uri, conf);
+      } else {
+        return FileSystem.newInstance(uri, conf);
       }
+    }
 
-      /**
-       * When ViewFileSystemOverloadScheme scheme and target uri scheme are
-       * matching, it will not take advantage of FileSystem cache as it will
-       * create instance directly. For caching needs please set
-       * "fs.viewfs.enable.inner.cache" to true.
-       */
-      @Override
-      public FileSystem get(URI uri, Configuration conf) throws IOException {
-        if (uri.getScheme().equals(getScheme())) {
-          // Avoid looping when target fs scheme is matching to overloaded
-          // scheme.
-          if (LOG.isDebugEnabled()) {
-            LOG.debug(
-                "The file system initialized uri scheme is matching with the "
-                    + "given target uri scheme. So, the target file system "
-                    + "instances will not be cached. To cache fs instances, "
-                    + "please set fs.viewfs.enable.inner.cache to true. "
-                    + "The target uri is: " + uri);
-          }
-          return createFileSystem(uri, conf);
-        } else {
-          return FileSystem.get(uri, conf);
+    /**
+     * When ViewFileSystemOverloadScheme scheme and target uri scheme are
+     * matching, it will not take advantage of FileSystem cache as it will
+     * create instance directly. For caching needs please set
+     * "fs.viewfs.enable.inner.cache" to true.
+     */
+    @Override
+    public FileSystem get(URI uri, Configuration conf) throws IOException {
+      if (uri.getScheme().equals(this.rootScheme)) {
+        // Avoid looping when target fs scheme is matching to overloaded
+        // scheme.
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(
+              "The file system initialized uri scheme is matching with the "
+                  + "given target uri scheme. So, the target file system "
+                  + "instances will not be cached. To cache fs instances, "
+                  + "please set fs.viewfs.enable.inner.cache to true. "
+                  + "The target uri is: " + uri);
         }
+        return createFileSystem(uri, conf);
+      } else {
+        return FileSystem.get(uri, conf);
       }
+    }
 
-      private FileSystem createFileSystem(URI uri, Configuration conf)
-          throws IOException {
-        final String fsImplConf = String.format(
-            FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
-            uri.getScheme());
-        Class<?> clazz = conf.getClass(fsImplConf, null);
-        if (clazz == null) {
-          throw new UnsupportedFileSystemException(
-              String.format("%s=null: %s: %s", fsImplConf,
-                  "No overload scheme fs configured", uri.getScheme()));
-        }
-        FileSystem fs = (FileSystem) newInstance(clazz, uri, conf);
-        fs.initialize(uri, conf);
-        return fs;
+    private FileSystem createFileSystem(URI uri, Configuration conf)
+        throws IOException {
+      final String fsImplConf = String.format(
+          FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
+          uri.getScheme());
+      Class<?> clazz = conf.getClass(fsImplConf, null);
+      if (clazz == null) {
+        throw new UnsupportedFileSystemException(
+            String.format("%s=null: %s: %s", fsImplConf,
+                "No overload scheme fs configured", uri.getScheme()));
       }
+      FileSystem fs = (FileSystem) newInstance(clazz, uri, conf);
+      fs.initialize(uri, conf);
+      return fs;
+    }
 
-      private <T> T newInstance(Class<T> theClass, URI uri,
-          Configuration conf) {
-        T result;
-        try {
-          Constructor<T> meth = theClass.getConstructor();
-          meth.setAccessible(true);
-          result = meth.newInstance();
-        } catch (InvocationTargetException e) {
-          Throwable cause = e.getCause();
-          if (cause instanceof RuntimeException) {
-            throw (RuntimeException) cause;
-          } else {
-            throw new RuntimeException(cause);
-          }
-        } catch (Exception e) {
-          throw new RuntimeException(e);
+    private <T> T newInstance(Class<T> theClass, URI uri, Configuration conf) {
+      T result;
+      try {
+        Constructor<T> meth = theClass.getConstructor();
+        meth.setAccessible(true);
+        result = meth.newInstance();
+      } catch (InvocationTargetException e) {
+        Throwable cause = e.getCause();
+        if (cause instanceof RuntimeException) {
+          throw (RuntimeException) cause;
+        } else {
+          throw new RuntimeException(cause);
         }
-        return result;
+      } catch (Exception e) {
+        throw new RuntimeException(e);
       }
-    };
+      return result;
+    }
+
   }
+
 }
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/package-info.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/package-info.java
new file mode 100644
index 0000000..89986d0
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/package-info.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * ViewFileSystem and ViewFileSystemOverloadScheme classes.
+ */
+@InterfaceAudience.LimitedPrivate({"MapReduce", "HBase", "Hive" })
+@InterfaceStability.Stable
+package org.apache.hadoop.fs.viewfs;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestHCFSMountTableConfigLoader.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestHCFSMountTableConfigLoader.java
new file mode 100644
index 0000000..bf7a6e3
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestHCFSMountTableConfigLoader.java
@@ -0,0 +1,165 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests the mount table loading.
+ */
+public class TestHCFSMountTableConfigLoader {
+
+  private static final String DOT = ".";
+
+  private static final String TARGET_TWO = "/tar2";
+
+  private static final String TARGET_ONE = "/tar1";
+
+  private static final String SRC_TWO = "/src2";
+
+  private static final String SRC_ONE = "/src1";
+
+  private static final String TABLE_NAME = "test";
+
+  private MountTableConfigLoader loader = new HCFSMountTableConfigLoader();
+
+  private static FileSystem fsTarget;
+  private static Configuration conf;
+  private static Path targetTestRoot;
+  private static FileSystemTestHelper fileSystemTestHelper =
+      new FileSystemTestHelper();
+  private static File oldVersionMountTableFile;
+  private static File newVersionMountTableFile;
+  private static final String MOUNT_LINK_KEY_SRC_ONE =
+      new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(DOT)
+          .append(TABLE_NAME).append(DOT).append(Constants.CONFIG_VIEWFS_LINK)
+          .append(DOT).append(SRC_ONE).toString();
+  private static final String MOUNT_LINK_KEY_SRC_TWO =
+      new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(DOT)
+          .append(TABLE_NAME).append(DOT).append(Constants.CONFIG_VIEWFS_LINK)
+          .append(DOT).append(SRC_TWO).toString();
+
+  @BeforeClass
+  public static void init() throws Exception {
+    fsTarget = new LocalFileSystem();
+    fsTarget.initialize(new URI("file:///"), new Configuration());
+    targetTestRoot = fileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
+    fsTarget.delete(targetTestRoot, true);
+    fsTarget.mkdirs(targetTestRoot);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set(String.format(
+        FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, "file"),
+        LocalFileSystem.class.getName());
+    oldVersionMountTableFile =
+        new File(new URI(targetTestRoot.toString() + "/table.1.xml"));
+    oldVersionMountTableFile.createNewFile();
+    newVersionMountTableFile =
+        new File(new URI(targetTestRoot.toString() + "/table.2.xml"));
+    newVersionMountTableFile.createNewFile();
+  }
+
+  @Test
+  public void testMountTableFileLoadingWhenMultipleFilesExist()
+      throws Exception {
+    ViewFsTestSetup.addMountLinksToFile(TABLE_NAME,
+        new String[] {SRC_ONE, SRC_TWO }, new String[] {TARGET_ONE,
+            TARGET_TWO },
+        new Path(newVersionMountTableFile.toURI()), conf);
+    loader.load(targetTestRoot.toString(), conf);
+    Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_TWO), TARGET_TWO);
+    Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_ONE), TARGET_ONE);
+  }
+
+  @Test
+  public void testMountTableFileWithInvalidFormat() throws Exception {
+    Path path = new Path(new URI(
+        targetTestRoot.toString() + "/testMountTableFileWithInvalidFormat/"));
+    fsTarget.mkdirs(path);
+    File invalidMountFileName =
+        new File(new URI(path.toString() + "/table.InvalidVersion.xml"));
+    invalidMountFileName.createNewFile();
+    // Adding mount links to make sure it will not read it.
+    ViewFsTestSetup.addMountLinksToFile(TABLE_NAME,
+        new String[] {SRC_ONE, SRC_TWO }, new String[] {TARGET_ONE,
+            TARGET_TWO },
+        new Path(invalidMountFileName.toURI()), conf);
+    // Pass mount table directory
+    loader.load(path.toString(), conf);
+    Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_TWO));
+    Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_ONE));
+    invalidMountFileName.delete();
+  }
+
+  @Test
+  public void testMountTableFileWithInvalidFormatWithNoDotsInName()
+      throws Exception {
+    Path path = new Path(new URI(targetTestRoot.toString()
+        + "/testMountTableFileWithInvalidFormatWithNoDots/"));
+    fsTarget.mkdirs(path);
+    File invalidMountFileName =
+        new File(new URI(path.toString() + "/tableInvalidVersionxml"));
+    invalidMountFileName.createNewFile();
+    // Pass mount table directory
+    loader.load(path.toString(), conf);
+    Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_TWO));
+    Assert.assertEquals(null, conf.get(MOUNT_LINK_KEY_SRC_ONE));
+    invalidMountFileName.delete();
+  }
+
+  @Test(expected = FileNotFoundException.class)
+  public void testLoadWithMountFile() throws Exception {
+    loader.load(new URI(targetTestRoot.toString() + "/Non-Existent-File.xml")
+        .toString(), conf);
+  }
+
+  @Test
+  public void testLoadWithNonExistentMountFile() throws Exception {
+    ViewFsTestSetup.addMountLinksToFile(TABLE_NAME,
+        new String[] {SRC_ONE, SRC_TWO },
+        new String[] {TARGET_ONE, TARGET_TWO },
+        new Path(oldVersionMountTableFile.toURI()), conf);
+    loader.load(oldVersionMountTableFile.toURI().toString(), conf);
+    Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_TWO), TARGET_TWO);
+    Assert.assertEquals(conf.get(MOUNT_LINK_KEY_SRC_ONE), TARGET_ONE);
+  }
+
+  @AfterClass
+  public static void tearDown() throws IOException {
+    fsTarget.delete(targetTestRoot, true);
+  }
+
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeCentralMountTableConfig.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeCentralMountTableConfig.java
new file mode 100644
index 0000000..1527e3c
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeCentralMountTableConfig.java
@@ -0,0 +1,77 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+
+/**
+ * Test the TestViewFSOverloadSchemeCentralMountTableConfig with mount-table
+ * configuration files in configured fs location.
+ */
+public class TestViewFSOverloadSchemeCentralMountTableConfig
+    extends TestViewFileSystemOverloadSchemeLocalFileSystem {
+  private Path oldMountTablePath;
+  private Path latestMountTablepath;
+
+  @Before
+  public void setUp() throws Exception {
+    super.setUp();
+    // Mount table name format: mount-table.<versionNumber>.xml
+    String mountTableFileName1 = "mount-table.1.xml";
+    String mountTableFileName2 = "mount-table.2.xml";
+    oldMountTablePath =
+        new Path(getTestRoot() + File.separator + mountTableFileName1);
+    latestMountTablepath =
+        new Path(getTestRoot() + File.separator + mountTableFileName2);
+    getConf().set(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH,
+        getTestRoot().toString());
+    File f = new File(oldMountTablePath.toUri());
+    f.createNewFile(); // Just creating empty mount-table file.
+    File f2 = new File(latestMountTablepath.toUri());
+    latestMountTablepath = new Path(f2.toURI());
+    f2.createNewFile();
+  }
+
+  /**
+   * This method saves the mount links in a local files.
+   */
+  @Override
+  void addMountLinks(String mountTable, String[] sources, String[] targets,
+      Configuration conf) throws IOException, URISyntaxException {
+    // we don't use conf here, instead we use config paths to store links.
+    // Mount-table old version file mount-table-<version>.xml
+    try (BufferedWriter out = new BufferedWriter(
+        new FileWriter(new File(oldMountTablePath.toUri())))) {
+      out.write("<configuration>\n");
+      // Invalid tag. This file should not be read.
+      out.write("</\\\name//\\>");
+      out.write("</configuration>\n");
+      out.flush();
+    }
+    ViewFsTestSetup.addMountLinksToFile(mountTable, sources, targets,
+        latestMountTablepath, conf);
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java
index 7e38903..ac7a1a6 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.fs.viewfs;
 
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -67,15 +68,25 @@ public class TestViewFileSystemOverloadSchemeLocalFileSystem {
   }
 
   /**
+   * Adds the given mount links to config. sources contains mount link src and
+   * the respective index location in targets contains the target uri.
+   */
+  void addMountLinks(String mountTable, String[] sources, String[] targets,
+      Configuration config) throws IOException, URISyntaxException {
+    ViewFsTestSetup.addMountLinksToConf(mountTable, sources, targets, config);
+  }
+
+  /**
    * Tests write file and read file with ViewFileSystemOverloadScheme.
    */
   @Test
-  public void testLocalTargetLinkWriteSimple() throws IOException {
+  public void testLocalTargetLinkWriteSimple()
+      throws IOException, URISyntaxException {
     LOG.info("Starting testLocalTargetLinkWriteSimple");
     final String testString = "Hello Local!...";
     final Path lfsRoot = new Path("/lfsRoot");
-    ConfigUtil.addLink(conf, lfsRoot.toString(),
-        URI.create(targetTestRoot + "/local"));
+    addMountLinks(null, new String[] {lfsRoot.toString() },
+        new String[] {targetTestRoot + "/local" }, conf);
     try (FileSystem lViewFs = FileSystem.get(URI.create("file:///"), conf)) {
       final Path testPath = new Path(lfsRoot, "test.txt");
       try (FSDataOutputStream fsDos = lViewFs.create(testPath)) {
@@ -94,8 +105,8 @@ public class TestViewFileSystemOverloadSchemeLocalFileSystem {
   @Test
   public void testLocalFsCreateAndDelete() throws Exception {
     LOG.info("Starting testLocalFsCreateAndDelete");
-    ConfigUtil.addLink(conf, "mt", "/lfsroot",
-        URI.create(targetTestRoot + "/wd2"));
+    addMountLinks("mt", new String[] {"/lfsroot" },
+        new String[] {targetTestRoot + "/wd2" }, conf);
     final URI mountURI = URI.create("file://mt/");
     try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
       Path testPath = new Path(mountURI.toString() + "/lfsroot/test");
@@ -113,8 +124,9 @@ public class TestViewFileSystemOverloadSchemeLocalFileSystem {
   @Test
   public void testLocalFsLinkSlashMerge() throws Exception {
     LOG.info("Starting testLocalFsLinkSlashMerge");
-    ConfigUtil.addLinkMergeSlash(conf, "mt",
-        URI.create(targetTestRoot + "/wd2"));
+    addMountLinks("mt",
+        new String[] {Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH },
+        new String[] {targetTestRoot + "/wd2" }, conf);
     final URI mountURI = URI.create("file://mt/");
     try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
       Path fileOnRoot = new Path(mountURI.toString() + "/NewFile");
@@ -130,10 +142,9 @@ public class TestViewFileSystemOverloadSchemeLocalFileSystem {
   @Test(expected = IOException.class)
   public void testLocalFsLinkSlashMergeWithOtherMountLinks() throws Exception {
     LOG.info("Starting testLocalFsLinkSlashMergeWithOtherMountLinks");
-    ConfigUtil.addLink(conf, "mt", "/lfsroot",
-        URI.create(targetTestRoot + "/wd2"));
-    ConfigUtil.addLinkMergeSlash(conf, "mt",
-        URI.create(targetTestRoot + "/wd2"));
+    addMountLinks("mt",
+        new String[] {"/lfsroot", Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH },
+        new String[] {targetTestRoot + "/wd2", targetTestRoot + "/wd2" }, conf);
     final URI mountURI = URI.create("file://mt/");
     FileSystem.get(mountURI, conf);
     Assert.fail("A merge slash cannot be configured with other mount links.");
@@ -146,4 +157,18 @@ public class TestViewFileSystemOverloadSchemeLocalFileSystem {
       fsTarget.close();
     }
   }
+
+  /**
+   * Returns the test root dir.
+   */
+  public Path getTestRoot() {
+    return this.targetTestRoot;
+  }
+
+  /**
+   * Returns the conf.
+   */
+  public Configuration getConf() {
+    return this.conf;
+  }
 }
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java
index 9b7e17f..3cc2805 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFsTestSetup.java
@@ -17,14 +17,18 @@
  */
 package org.apache.hadoop.fs.viewfs;
 
+import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileContextTestHelper;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsConstants;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.viewfs.ConfigUtil;
+import org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme.ChildFsGetter;
 import org.apache.hadoop.util.Shell;
 import org.eclipse.jetty.util.log.Log;
 
@@ -132,4 +136,69 @@ public class ViewFsTestSetup {
         + firstComponent + "->" + linkTarget);    
   }
 
+  /**
+   * Adds the given mount links to the given Hadoop compatible file system path.
+   * Mount link mappings are in sources, targets at their respective index
+   * locations.
+   */
+  static void addMountLinksToFile(String mountTable, String[] sources,
+      String[] targets, Path mountTableConfPath, Configuration conf)
+      throws IOException, URISyntaxException {
+    ChildFsGetter cfs = new ViewFileSystemOverloadScheme.ChildFsGetter(
+        mountTableConfPath.toUri().getScheme());
+    try (FileSystem fs = cfs.getNewInstance(mountTableConfPath.toUri(), conf)) {
+      try (FSDataOutputStream out = fs.create(mountTableConfPath)) {
+        String prefix =
+            new StringBuilder(Constants.CONFIG_VIEWFS_PREFIX).append(".")
+                .append((mountTable == null
+                    ? Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE
+                    : mountTable))
+                .append(".").toString();
+        out.writeBytes("<configuration>");
+        for (int i = 0; i < sources.length; i++) {
+          String src = sources[i];
+          String target = targets[i];
+          out.writeBytes("<property><name>");
+          if (Constants.CONFIG_VIEWFS_LINK_FALLBACK.equals(src)) {
+            out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_FALLBACK);
+            out.writeBytes("</name>");
+          } else if (Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH.equals(src)) {
+            out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH);
+            out.writeBytes("</name>");
+          } else {
+            out.writeBytes(prefix + Constants.CONFIG_VIEWFS_LINK + "." + src);
+            out.writeBytes("</name>");
+          }
+          out.writeBytes("<value>");
+          out.writeBytes(target);
+          out.writeBytes("</value></property>");
+          out.flush();
+        }
+        out.writeBytes(("</configuration>"));
+        out.flush();
+      }
+    }
+  }
+
+  /**
+   * Adds the given mount links to the configuration. Mount link mappings are
+   * in sources, targets at their respective index locations.
+   */
+  static void addMountLinksToConf(String mountTable, String[] sources,
+      String[] targets, Configuration config) throws URISyntaxException {
+    for (int i = 0; i < sources.length; i++) {
+      String src = sources[i];
+      String target = targets[i];
+      String mountTableName = mountTable == null ?
+          Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE : mountTable;
+      if (src.equals(Constants.CONFIG_VIEWFS_LINK_FALLBACK)) {
+        ConfigUtil.addLinkFallback(config, mountTableName, new URI(target));
+      } else if (src.equals(Constants.CONFIG_VIEWFS_LINK_MERGE_SLASH)) {
+        ConfigUtil.addLinkMergeSlash(config, mountTableName, new URI(target));
+      } else {
+        ConfigUtil.addLink(config, mountTableName, src, new URI(target));
+      }
+    }
+  }
+
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java
new file mode 100644
index 0000000..e7afbed
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFSOverloadSchemeWithMountTableConfigInHDFS.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.Before;
+
+/**
+ * Tests ViewFileSystemOverloadScheme with configured mount links.
+ */
+public class TestViewFSOverloadSchemeWithMountTableConfigInHDFS
+    extends TestViewFileSystemOverloadSchemeWithHdfsScheme {
+  private Path oldVersionMountTablePath;
+  private Path newVersionMountTablePath;
+
+  @Before
+  @Override
+  public void startCluster() throws IOException {
+    super.startCluster();
+    String mountTableDir =
+        URI.create(getConf().get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY))
+            .toString() + "/MountTable/";
+    getConf().set(Constants.CONFIG_VIEWFS_MOUNTTABLE_PATH, mountTableDir);
+    FileSystem fs = new ViewFileSystemOverloadScheme.ChildFsGetter("hdfs")
+        .getNewInstance(new Path(mountTableDir).toUri(), getConf());
+    fs.mkdirs(new Path(mountTableDir));
+    String oldVersionMountTable = "mount-table.30.xml";
+    String newVersionMountTable = "mount-table.31.xml";
+    oldVersionMountTablePath = new Path(mountTableDir, oldVersionMountTable);
+    newVersionMountTablePath = new Path(mountTableDir, newVersionMountTable);
+    fs.createNewFile(oldVersionMountTablePath);
+    fs.createNewFile(newVersionMountTablePath);
+  }
+
+  /**
+   * This method saves the mount links in a hdfs file newVersionMountTable.
+   * Since this file has highest version, this should be loaded by
+   * ViewFSOverloadScheme.
+   */
+  @Override
+  void addMountLinks(String mountTable, String[] sources, String[] targets,
+      Configuration config) throws IOException, URISyntaxException {
+    ViewFsTestSetup.addMountLinksToFile(mountTable, sources, targets,
+        newVersionMountTablePath, getConf());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java
index bf0a8fe..0681c22 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.fs.viewfs;
 import java.io.File;
 import java.io.IOException;
 import java.net.URI;
+import java.net.URISyntaxException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -54,6 +55,9 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   private static final String HDFS_USER_FOLDER = "/HDFSUser";
   private static final String LOCAL_FOLDER = "/local";
 
+  /**
+   * Sets up the configurations and starts the MiniDFSCluster.
+   */
   @Before
   public void startCluster() throws IOException {
     conf = new Configuration();
@@ -80,16 +84,13 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
     }
   }
 
-  private void createLinks(boolean needFalbackLink, Path hdfsTargetPath,
-      Path localTragetPath) {
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER,
-        hdfsTargetPath.toUri());
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER,
-        localTragetPath.toUri());
-    if (needFalbackLink) {
-      ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(),
-          hdfsTargetPath.toUri());
-    }
+  /**
+   * Adds the given mount links to config. sources contains mount link src and
+   * the respective index location in targets contains the target uri.
+   */
+  void addMountLinks(String mountTable, String[] sources, String[] targets,
+      Configuration config) throws IOException, URISyntaxException {
+    ViewFsTestSetup.addMountLinksToConf(mountTable, sources, targets, config);
   }
 
   /**
@@ -105,7 +106,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
     final Path localTragetPath = new Path(localTargetDir.toURI());
 
-    createLinks(false, hdfsTargetPath, localTragetPath);
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString() },
+        conf);
 
     // /HDFSUser/testfile
     Path hdfsFile = new Path(HDFS_USER_FOLDER + "/testfile");
@@ -156,8 +161,8 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
      * Below addLink will create following mount points
      * hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/
      */
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), userFolder,
-        nonExistTargetPath.toUri());
+    addMountLinks(defaultFSURI.getAuthority(), new String[] {userFolder },
+        new String[] {nonExistTargetPath.toUri().toString() }, conf);
     FileSystem.get(conf);
     Assert.fail("Expected to fail with non existent link");
   }
@@ -171,9 +176,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   @Test(timeout = 30000)
   public void testListStatusOnRootShouldListAllMountLinks() throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    final Path localTragetPath = new Path(localTargetDir.toURI());
-
-    createLinks(false, hdfsTargetPath, localTragetPath);
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString() },
+        conf);
 
     try (FileSystem fs = FileSystem.get(conf)) {
       FileStatus[] ls = fs.listStatus(new Path("/"));
@@ -198,9 +205,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   @Test(expected = IOException.class, timeout = 30000)
   public void testListStatusOnNonMountedPath() throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    final Path localTragetPath = new Path(localTargetDir.toURI());
-
-    createLinks(false, hdfsTargetPath, localTragetPath);
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString() },
+        conf);
 
     try (FileSystem fs = FileSystem.get(conf)) {
       fs.listStatus(new Path("/nonMount"));
@@ -218,9 +227,13 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   @Test(timeout = 30000)
   public void testWithLinkFallBack() throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    final Path localTragetPath = new Path(localTargetDir.toURI());
-
-    createLinks(true, hdfsTargetPath, localTragetPath);
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
+            Constants.CONFIG_VIEWFS_LINK_FALLBACK },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString(),
+            hdfsTargetPath.toUri().toString() },
+        conf);
 
     try (FileSystem fs = FileSystem.get(conf)) {
       fs.createNewFile(new Path("/nonMount/myfile"));
@@ -243,10 +256,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   public void testCreateOnRootShouldFailWhenMountLinkConfigured()
       throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    final Path localTragetPath = new Path(localTargetDir.toURI());
-
-    createLinks(false, hdfsTargetPath, localTragetPath);
-
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString() },
+        conf);
     try (FileSystem fs = FileSystem.get(conf)) {
       fs.createNewFile(new Path("/newFileOnRoot"));
       Assert.fail("It should fail as root is read only in viewFS.");
@@ -265,8 +279,13 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   public void testCreateOnRootShouldFailEvenFallBackMountLinkConfigured()
       throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    final Path localTragetPath = new Path(localTargetDir.toURI());
-    createLinks(true, hdfsTargetPath, localTragetPath);
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
+            Constants.CONFIG_VIEWFS_LINK_FALLBACK },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString(),
+            hdfsTargetPath.toUri().toString() },
+        conf);
     try (FileSystem fs = FileSystem.get(conf)) {
       fs.createNewFile(new Path("/onRootWhenFallBack"));
       Assert.fail(
@@ -290,9 +309,14 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   @Test(expected = UnsupportedFileSystemException.class, timeout = 30000)
   public void testInvalidOverloadSchemeTargetFS() throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    final Path localTragetPath = new Path(localTargetDir.toURI());
     conf = new Configuration();
-    createLinks(true, hdfsTargetPath, localTragetPath);
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER,
+            Constants.CONFIG_VIEWFS_LINK_FALLBACK },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString(),
+            hdfsTargetPath.toUri().toString() },
+        conf);
     conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
         defaultFSURI.toString());
     conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
@@ -315,8 +339,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   public void testViewFsOverloadSchemeWhenInnerCacheDisabled()
       throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    final Path localTragetPath = new Path(localTargetDir.toURI());
-    createLinks(false, hdfsTargetPath, localTragetPath);
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER, LOCAL_FOLDER },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            localTargetDir.toURI().toString(), },
+        conf);
     conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
     try (FileSystem fs = FileSystem.get(conf)) {
       Path testFile = new Path(HDFS_USER_FOLDER + "/testFile");
@@ -337,10 +364,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   public void testViewFsOverloadSchemeWithInnerCache()
       throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
-        hdfsTargetPath.toUri());
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
-        hdfsTargetPath.toUri());
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER + 0, HDFS_USER_FOLDER + 1 },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            hdfsTargetPath.toUri().toString() },
+        conf);
 
     // 1. Only 1 hdfs child file system should be there with cache.
     try (ViewFileSystemOverloadScheme vfs =
@@ -368,10 +396,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets()
       throws Exception {
     final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
-        hdfsTargetPath.toUri());
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
-        hdfsTargetPath.toUri());
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {HDFS_USER_FOLDER + 0, HDFS_USER_FOLDER + 1 },
+        new String[] {hdfsTargetPath.toUri().toString(),
+            hdfsTargetPath.toUri().toString() },
+        conf);
 
     conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
     // Two hdfs file systems should be there if no cache.
@@ -394,10 +423,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
   public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
       throws Exception {
     final Path localTragetPath = new Path(localTargetDir.toURI());
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 0,
-        localTragetPath.toUri());
-    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 1,
-        localTragetPath.toUri());
+    addMountLinks(defaultFSURI.getAuthority(),
+        new String[] {LOCAL_FOLDER + 0, LOCAL_FOLDER + 1 },
+        new String[] {localTragetPath.toUri().toString(),
+            localTragetPath.toUri().toString() },
+        conf);
 
     // Only one local file system should be there if no InnerCache, but fs
     // cache should work.
@@ -407,4 +437,11 @@ public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
       Assert.assertEquals(1, vfs.getChildFileSystems().length);
     }
   }
+
+  /**
+   * @return configuration.
+   */
+  public Configuration getConf() {
+    return this.conf;
+  }
 }
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org