You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2020/05/12 00:03:48 UTC

[hadoop] branch branch-3.2 updated: HDFS-15305. Extend ViewFS and provide ViewFileSystemOverloadScheme implementation with scheme configurable. Contributed by Uma Maheswara Rao G.

This is an automated email from the ASF dual-hosted git repository.

virajith pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 0d0aac1  HDFS-15305. Extend ViewFS and provide ViewFileSystemOverloadScheme implementation with scheme configurable. Contributed by Uma Maheswara Rao G.
0d0aac1 is described below

commit 0d0aac1a2afe1d4ec8772adc15e9683f969502d7
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Mon May 4 17:55:40 2020 -0700

    HDFS-15305. Extend ViewFS and provide ViewFileSystemOverloadScheme implementation with scheme configurable. Contributed by Uma Maheswara Rao G.
    
    (cherry picked from commit 9c8236d04dfc3d4cefe7a00b63625f60ee232cfe)
---
 .../java/org/apache/hadoop/fs/FsConstants.java     |   2 +
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java    |  49 ++-
 .../fs/viewfs/ViewFileSystemOverloadScheme.java    | 204 ++++++++++
 .../hadoop/fs/FileSystemContractBaseTest.java      |   3 +-
 ...iewFileSystemOverloadSchemeLocalFileSystem.java | 149 ++++++++
 .../hadoop/fs/viewfs/ViewFileSystemBaseTest.java   |   4 +-
 ...SystemOverloadSchemeHdfsFileSystemContract.java | 126 +++++++
 ...ViewFileSystemOverloadSchemeWithHdfsScheme.java | 410 +++++++++++++++++++++
 8 files changed, 937 insertions(+), 10 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java
index cfef1c3..07c16b2 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsConstants.java
@@ -42,4 +42,6 @@ public interface FsConstants {
    */
   public static final URI VIEWFS_URI = URI.create("viewfs:///");
   public static final String VIEWFS_SCHEME = "viewfs";
+  String FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN =
+      "fs.viewfs.overload.scheme.target.%s.impl";
 }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index d7fcb34..f052743 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -17,9 +17,9 @@
  */
 package org.apache.hadoop.fs.viewfs;
 
-import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
 import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE;
 import static org.apache.hadoop.fs.viewfs.Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT;
+import static org.apache.hadoop.fs.viewfs.Constants.PERMISSION_555;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -34,9 +34,9 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Set;
-import java.util.Map.Entry;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
@@ -95,15 +95,48 @@ public class ViewFileSystem extends FileSystem {
   }
 
   /**
+   * File system instance getter.
+   */
+  static class FsGetter {
+
+    /**
+     * Gets new file system instance of given uri.
+     */
+    public FileSystem getNewInstance(URI uri, Configuration conf)
+        throws IOException {
+      return FileSystem.newInstance(uri, conf);
+    }
+
+    /**
+     * Gets file system instance of given uri.
+     */
+    public FileSystem get(URI uri, Configuration conf) throws IOException {
+      return FileSystem.get(uri, conf);
+    }
+  }
+
+  /**
+   * Gets file system creator instance.
+   */
+  protected FsGetter fsGetter() {
+    return new FsGetter();
+  }
+
+  /**
    * Caching children filesystems. HADOOP-15565.
    */
   static class InnerCache {
     private Map<Key, FileSystem> map = new HashMap<>();
+    private FsGetter fsCreator;
+
+    InnerCache(FsGetter fsCreator) {
+      this.fsCreator = fsCreator;
+    }
 
     FileSystem get(URI uri, Configuration config) throws IOException {
       Key key = new Key(uri);
       if (map.get(key) == null) {
-        FileSystem fs = FileSystem.newInstance(uri, config);
+        FileSystem fs = fsCreator.getNewInstance(uri, config);
         map.put(key, fs);
         return fs;
       } else {
@@ -191,7 +224,7 @@ public class ViewFileSystem extends FileSystem {
 
   final long creationTime; // of the the mount table
   final UserGroupInformation ugi; // the user/group of user who created mtable
-  URI myUri;
+  private URI myUri;
   private Path workingDir;
   Configuration config;
   InodeTree<FileSystem> fsState;  // the fs state; ie the mount table
@@ -253,13 +286,13 @@ public class ViewFileSystem extends FileSystem {
     config = conf;
     enableInnerCache = config.getBoolean(CONFIG_VIEWFS_ENABLE_INNER_CACHE,
         CONFIG_VIEWFS_ENABLE_INNER_CACHE_DEFAULT);
-    final InnerCache innerCache = new InnerCache();
+    FsGetter fsGetter = fsGetter();
+    final InnerCache innerCache = new InnerCache(fsGetter);
     // Now build  client side view (i.e. client side mount table) from config.
     final String authority = theUri.getAuthority();
     try {
-      myUri = new URI(FsConstants.VIEWFS_SCHEME, authority, "/", null, null);
+      myUri = new URI(getScheme(), authority, "/", null, null);
       fsState = new InodeTree<FileSystem>(conf, authority) {
-
         @Override
         protected FileSystem getTargetFileSystem(final URI uri)
           throws URISyntaxException, IOException {
@@ -267,7 +300,7 @@ public class ViewFileSystem extends FileSystem {
             if (enableInnerCache) {
               fs = innerCache.get(uri, config);
             } else {
-              fs = FileSystem.get(uri, config);
+              fs = fsGetter.get(uri, config);
             }
             return new ChRootedFileSystem(fs, uri);
         }
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java
new file mode 100644
index 0000000..2dda540
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystemOverloadScheme.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.IOException;
+import java.lang.reflect.Constructor;
+import java.lang.reflect.InvocationTargetException;
+import java.net.URI;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+
+/******************************************************************************
+ * This class is extended from the ViewFileSystem for the overloaded scheme
+ * file system. Mount link configurations and in-memory mount table
+ * building behaviors are inherited from ViewFileSystem. Unlike ViewFileSystem
+ * scheme (viewfs://), the users would be able to use any scheme.
+ *
+ * To use this class, the following configurations need to be added in
+ * core-site.xml file.
+ * 1) fs.<scheme>.impl
+ *    = org.apache.hadoop.fs.viewfs.ViewFileSystemOverloadScheme
+ * 2) fs.viewfs.overload.scheme.target.<scheme>.impl
+ *    = <hadoop compatible file system implementation class name for the
+ *    <scheme>"
+ *
+ * Here <scheme> can be any scheme, but with that scheme there should be a
+ * hadoop compatible file system available. Second configuration value should
+ * be the respective scheme's file system implementation class.
+ * Example: if scheme is configured with "hdfs", then the 2nd configuration
+ * class name will be org.apache.hadoop.hdfs.DistributedFileSystem.
+ * if scheme is configured with "s3a", then the 2nd configuration class name
+ * will be org.apache.hadoop.fs.s3a.S3AFileSystem.
+ *
+ * Use Case 1:
+ * ===========
+ * If users want some of their existing cluster (hdfs://Cluster)
+ * data to mount with other hdfs and object store clusters(hdfs://NN1,
+ * o3fs://bucket1.volume1/, s3a://bucket1/)
+ *
+ * fs.viewfs.mounttable.Cluster./user = hdfs://NN1/user
+ * fs.viewfs.mounttable.Cluster./data = o3fs://bucket1.volume1/data
+ * fs.viewfs.mounttable.Cluster./backup = s3a://bucket1/backup/
+ *
+ * Op1: Create file hdfs://Cluster/user/fileA will go to hdfs://NN1/user/fileA
+ * Op2: Create file hdfs://Cluster/data/datafile will go to
+ *      o3fs://bucket1.volume1/data/datafile
+ * Op3: Create file hdfs://Cluster/backup/data.zip will go to
+ *      s3a://bucket1/backup/data.zip
+ *
+ * Use Case 2:
+ * ===========
+ * If users want some of their existing cluster (s3a://bucketA/)
+ * data to mount with other hdfs and object store clusters
+ * (hdfs://NN1, o3fs://bucket1.volume1/)
+ *
+ * fs.viewfs.mounttable.bucketA./user = hdfs://NN1/user
+ * fs.viewfs.mounttable.bucketA./data = o3fs://bucket1.volume1/data
+ * fs.viewfs.mounttable.bucketA./salesDB = s3a://bucketA/salesDB/
+ *
+ * Op1: Create file s3a://bucketA/user/fileA will go to hdfs://NN1/user/fileA
+ * Op2: Create file s3a://bucketA/data/datafile will go to
+ *      o3fs://bucket1.volume1/data/datafile
+ * Op3: Create file s3a://bucketA/salesDB/dbfile will go to
+ *      s3a://bucketA/salesDB/dbfile
+ *****************************************************************************/
+@InterfaceAudience.LimitedPrivate({ "MapReduce", "HBase", "Hive" })
+@InterfaceStability.Evolving
+public class ViewFileSystemOverloadScheme extends ViewFileSystem {
+  private URI myUri;
+  public ViewFileSystemOverloadScheme() throws IOException {
+    super();
+  }
+
+  @Override
+  public String getScheme() {
+    return myUri.getScheme();
+  }
+
+  @Override
+  public void initialize(URI theUri, Configuration conf) throws IOException {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Initializing the ViewFileSystemOverloadScheme with the uri: "
+          + theUri);
+    }
+    this.myUri = theUri;
+    super.initialize(theUri, conf);
+  }
+
+  /**
+   * This method is overridden because in ViewFileSystemOverloadScheme if
+   * overloaded cheme matches with mounted target fs scheme, file system should
+   * be created without going into fs.<scheme>.impl based resolution. Otherwise
+   * it will end up in an infinite loop as the target will be resolved again
+   * to ViewFileSystemOverloadScheme as fs.<scheme>.impl points to
+   * ViewFileSystemOverloadScheme. So, below method will initialize the
+   * fs.viewfs.overload.scheme.target.<scheme>.impl. Other schemes can
+   * follow fs.newInstance
+   */
+  @Override
+  protected FsGetter fsGetter() {
+
+    return new FsGetter() {
+      @Override
+      public FileSystem getNewInstance(URI uri, Configuration conf)
+          throws IOException {
+        if (uri.getScheme().equals(getScheme())) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "The file system initialized uri scheme is matching with the "
+                    + "given target uri scheme. The target uri is: " + uri);
+          }
+          /*
+           * Avoid looping when target fs scheme is matching to overloaded
+           * scheme.
+           */
+          return createFileSystem(uri, conf);
+        } else {
+          return FileSystem.newInstance(uri, conf);
+        }
+      }
+
+      /**
+       * When ViewFileSystemOverloadScheme scheme and target uri scheme are
+       * matching, it will not take advantage of FileSystem cache as it will
+       * create instance directly. For caching needs please set
+       * "fs.viewfs.enable.inner.cache" to true.
+       */
+      @Override
+      public FileSystem get(URI uri, Configuration conf) throws IOException {
+        if (uri.getScheme().equals(getScheme())) {
+          // Avoid looping when target fs scheme is matching to overloaded
+          // scheme.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+                "The file system initialized uri scheme is matching with the "
+                    + "given target uri scheme. So, the target file system "
+                    + "instances will not be cached. To cache fs instances, "
+                    + "please set fs.viewfs.enable.inner.cache to true. "
+                    + "The target uri is: " + uri);
+          }
+          return createFileSystem(uri, conf);
+        } else {
+          return FileSystem.get(uri, conf);
+        }
+      }
+
+      private FileSystem createFileSystem(URI uri, Configuration conf)
+          throws IOException {
+        final String fsImplConf = String.format(
+            FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
+            uri.getScheme());
+        Class<?> clazz = conf.getClass(fsImplConf, null);
+        if (clazz == null) {
+          throw new UnsupportedFileSystemException(
+              String.format("%s=null: %s: %s", fsImplConf,
+                  "No overload scheme fs configured", uri.getScheme()));
+        }
+        FileSystem fs = (FileSystem) newInstance(clazz, uri, conf);
+        fs.initialize(uri, conf);
+        return fs;
+      }
+
+      private <T> T newInstance(Class<T> theClass, URI uri,
+          Configuration conf) {
+        T result;
+        try {
+          Constructor<T> meth = theClass.getConstructor();
+          meth.setAccessible(true);
+          result = meth.newInstance();
+        } catch (InvocationTargetException e) {
+          Throwable cause = e.getCause();
+          if (cause instanceof RuntimeException) {
+            throw (RuntimeException) cause;
+          } else {
+            throw new RuntimeException(cause);
+          }
+        } catch (Exception e) {
+          throw new RuntimeException(e);
+        }
+        return result;
+      }
+    };
+  }
+}
\ No newline at end of file
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
index a4ccee3..8065b3f 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileSystemContractBaseTest.java
@@ -862,7 +862,8 @@ public abstract class FileSystemContractBaseTest {
                found);
   }
 
-  private void assertListStatusFinds(Path dir, Path subdir) throws IOException {
+  protected void assertListStatusFinds(Path dir, Path subdir)
+      throws IOException {
     FileStatus[] stats = fs.listStatus(dir);
     boolean found = false;
     StringBuilder builder = new StringBuilder();
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java
new file mode 100644
index 0000000..7e38903
--- /dev/null
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeLocalFileSystem.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.LocalFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ *
+ * Test the TestViewFileSystemOverloadSchemeLF using a file with authority:
+ * file://mountTableName/ i.e, the authority is used to load a mount table.
+ */
+public class TestViewFileSystemOverloadSchemeLocalFileSystem {
+  private static final String FILE = "file";
+  private static final Log LOG =
+      LogFactory.getLog(TestViewFileSystemOverloadSchemeLocalFileSystem.class);
+  private FileSystem fsTarget;
+  private Configuration conf;
+  private Path targetTestRoot;
+  private FileSystemTestHelper fileSystemTestHelper =
+      new FileSystemTestHelper();
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new Configuration();
+    conf.set(String.format("fs.%s.impl", FILE),
+        ViewFileSystemOverloadScheme.class.getName());
+    conf.set(String.format(
+        FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN, FILE),
+        LocalFileSystem.class.getName());
+    fsTarget = new LocalFileSystem();
+    fsTarget.initialize(new URI("file:///"), conf);
+    // create the test root on local_fs
+    targetTestRoot = fileSystemTestHelper.getAbsoluteTestRootPath(fsTarget);
+    fsTarget.delete(targetTestRoot, true);
+    fsTarget.mkdirs(targetTestRoot);
+  }
+
+  /**
+   * Tests write file and read file with ViewFileSystemOverloadScheme.
+   */
+  @Test
+  public void testLocalTargetLinkWriteSimple() throws IOException {
+    LOG.info("Starting testLocalTargetLinkWriteSimple");
+    final String testString = "Hello Local!...";
+    final Path lfsRoot = new Path("/lfsRoot");
+    ConfigUtil.addLink(conf, lfsRoot.toString(),
+        URI.create(targetTestRoot + "/local"));
+    try (FileSystem lViewFs = FileSystem.get(URI.create("file:///"), conf)) {
+      final Path testPath = new Path(lfsRoot, "test.txt");
+      try (FSDataOutputStream fsDos = lViewFs.create(testPath)) {
+        fsDos.writeUTF(testString);
+      }
+
+      try (FSDataInputStream lViewIs = lViewFs.open(testPath)) {
+        Assert.assertEquals(testString, lViewIs.readUTF());
+      }
+    }
+  }
+
+  /**
+   * Tests create file and delete file with ViewFileSystemOverloadScheme.
+   */
+  @Test
+  public void testLocalFsCreateAndDelete() throws Exception {
+    LOG.info("Starting testLocalFsCreateAndDelete");
+    ConfigUtil.addLink(conf, "mt", "/lfsroot",
+        URI.create(targetTestRoot + "/wd2"));
+    final URI mountURI = URI.create("file://mt/");
+    try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
+      Path testPath = new Path(mountURI.toString() + "/lfsroot/test");
+      lViewFS.createNewFile(testPath);
+      Assert.assertTrue(lViewFS.exists(testPath));
+      lViewFS.delete(testPath, true);
+      Assert.assertFalse(lViewFS.exists(testPath));
+    }
+  }
+
+  /**
+   * Tests root level file with linkMergeSlash with
+   * ViewFileSystemOverloadScheme.
+   */
+  @Test
+  public void testLocalFsLinkSlashMerge() throws Exception {
+    LOG.info("Starting testLocalFsLinkSlashMerge");
+    ConfigUtil.addLinkMergeSlash(conf, "mt",
+        URI.create(targetTestRoot + "/wd2"));
+    final URI mountURI = URI.create("file://mt/");
+    try (FileSystem lViewFS = FileSystem.get(mountURI, conf)) {
+      Path fileOnRoot = new Path(mountURI.toString() + "/NewFile");
+      lViewFS.createNewFile(fileOnRoot);
+      Assert.assertTrue(lViewFS.exists(fileOnRoot));
+    }
+  }
+
+  /**
+   * Tests with linkMergeSlash and other mounts in
+   * ViewFileSystemOverloadScheme.
+   */
+  @Test(expected = IOException.class)
+  public void testLocalFsLinkSlashMergeWithOtherMountLinks() throws Exception {
+    LOG.info("Starting testLocalFsLinkSlashMergeWithOtherMountLinks");
+    ConfigUtil.addLink(conf, "mt", "/lfsroot",
+        URI.create(targetTestRoot + "/wd2"));
+    ConfigUtil.addLinkMergeSlash(conf, "mt",
+        URI.create(targetTestRoot + "/wd2"));
+    final URI mountURI = URI.create("file://mt/");
+    FileSystem.get(mountURI, conf);
+    Assert.fail("A merge slash cannot be configured with other mount links.");
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (null != fsTarget) {
+      fsTarget.delete(fileSystemTestHelper.getTestRootPath(fsTarget), true);
+      fsTarget.close();
+    }
+  }
+}
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
index 510faf7..dcbd7f5 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/viewfs/ViewFileSystemBaseTest.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.permission.AclStatus;
 import org.apache.hadoop.fs.permission.AclUtil;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.viewfs.ViewFileSystem.FsGetter;
 import org.apache.hadoop.fs.viewfs.ViewFileSystem.MountPoint;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.Credentials;
@@ -1274,7 +1275,8 @@ abstract public class ViewFileSystemBaseTest {
 
   @Test
   public void testViewFileSystemInnerCache() throws Exception {
-    ViewFileSystem.InnerCache cache = new ViewFileSystem.InnerCache();
+    ViewFileSystem.InnerCache cache =
+        new ViewFileSystem.InnerCache(new FsGetter());
     FileSystem fs = cache.get(fsTarget.getUri(), conf);
 
     // InnerCache caches filesystem.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeHdfsFileSystemContract.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeHdfsFileSystemContract.java
new file mode 100644
index 0000000..03c29c9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeHdfsFileSystemContract.java
@@ -0,0 +1,126 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import static org.junit.Assume.assumeTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemContractBaseTest;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.TestHDFSFileSystemContract;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Tests ViewFileSystemOverloadScheme with file system contract tests.
+ */
+public class TestViewFileSystemOverloadSchemeHdfsFileSystemContract
+    extends TestHDFSFileSystemContract {
+
+  private static MiniDFSCluster cluster;
+  private static String defaultWorkingDirectory;
+  private static Configuration conf = new HdfsConfiguration();
+
+  @BeforeClass
+  public static void init() throws IOException {
+    final File basedir = GenericTestUtils.getRandomizedTestDir();
+    conf.set(CommonConfigurationKeys.FS_PERMISSIONS_UMASK_KEY,
+        FileSystemContractBaseTest.TEST_UMASK);
+    cluster = new MiniDFSCluster.Builder(conf, basedir)
+        .numDataNodes(2)
+        .build();
+    defaultWorkingDirectory =
+        "/user/" + UserGroupInformation.getCurrentUser().getShortUserName();
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    conf.set(String.format("fs.%s.impl", "hdfs"),
+        ViewFileSystemOverloadScheme.class.getName());
+    conf.set(String.format(
+        FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
+        "hdfs"),
+        DistributedFileSystem.class.getName());
+    URI defaultFSURI =
+        URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), "/user",
+        defaultFSURI);
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), "/append",
+        defaultFSURI);
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(),
+        "/FileSystemContractBaseTest/",
+        new URI(defaultFSURI.toString() + "/FileSystemContractBaseTest/"));
+    fs = FileSystem.get(conf);
+  }
+
+  @AfterClass
+  public static void tearDownAfter() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  @Override
+  protected String getDefaultWorkingDirectory() {
+    return defaultWorkingDirectory;
+  }
+
+  @Override
+  @Test
+  public void testAppend() throws IOException {
+    AppendTestUtil.testAppend(fs, new Path("/append/f"));
+  }
+
+  @Override
+  @Test(expected = AccessControlException.class)
+  public void testRenameRootDirForbidden() throws Exception {
+    super.testRenameRootDirForbidden();
+  }
+
+  @Override
+  @Test
+  public void testListStatusRootDir() throws Throwable {
+    assumeTrue(rootDirTestEnabled());
+    Path dir = path("/");
+    Path child = path("/FileSystemContractBaseTest");
+    assertListStatusFinds(dir, child);
+  }
+
+  @Override
+  @Ignore // This test same as above in this case.
+  public void testLSRootDir() throws Throwable {
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java
new file mode 100644
index 0000000..bf0a8fe
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemOverloadSchemeWithHdfsScheme.java
@@ -0,0 +1,410 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Tests ViewFileSystemOverloadScheme with configured mount links.
+ */
+public class TestViewFileSystemOverloadSchemeWithHdfsScheme {
+  private static final String FS_IMPL_PATTERN_KEY = "fs.%s.impl";
+  private static final String HDFS_SCHEME = "hdfs";
+  private Configuration conf = null;
+  private MiniDFSCluster cluster = null;
+  private URI defaultFSURI;
+  private File localTargetDir;
+  private static final String TEST_ROOT_DIR = PathUtils
+      .getTestDirName(TestViewFileSystemOverloadSchemeWithHdfsScheme.class);
+  private static final String HDFS_USER_FOLDER = "/HDFSUser";
+  private static final String LOCAL_FOLDER = "/local";
+
+  @Before
+  public void startCluster() throws IOException {
+    conf = new Configuration();
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
+        ViewFileSystemOverloadScheme.class.getName());
+    conf.set(String.format(
+        FsConstants.FS_VIEWFS_OVERLOAD_SCHEME_TARGET_FS_IMPL_PATTERN,
+        HDFS_SCHEME), DistributedFileSystem.class.getName());
+    cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
+    cluster.waitClusterUp();
+    defaultFSURI =
+        URI.create(conf.get(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY));
+    localTargetDir = new File(TEST_ROOT_DIR, "/root/");
+    Assert.assertEquals(HDFS_SCHEME, defaultFSURI.getScheme()); // hdfs scheme.
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    if (cluster != null) {
+      FileSystem.closeAll();
+      cluster.shutdown();
+    }
+  }
+
+  private void createLinks(boolean needFalbackLink, Path hdfsTargetPath,
+      Path localTragetPath) {
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER,
+        hdfsTargetPath.toUri());
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER,
+        localTragetPath.toUri());
+    if (needFalbackLink) {
+      ConfigUtil.addLinkFallback(conf, defaultFSURI.getAuthority(),
+          hdfsTargetPath.toUri());
+    }
+  }
+
+  /**
+   * Create mount links as follows.
+   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
+   *
+   * create file /HDFSUser/testfile should create in hdfs
+   * create file /local/test should create directory in local fs
+   */
+  @Test(timeout = 30000)
+  public void testMountLinkWithLocalAndHDFS() throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+
+    createLinks(false, hdfsTargetPath, localTragetPath);
+
+    // /HDFSUser/testfile
+    Path hdfsFile = new Path(HDFS_USER_FOLDER + "/testfile");
+    // /local/test
+    Path localDir = new Path(LOCAL_FOLDER + "/test");
+
+    try (ViewFileSystemOverloadScheme fs
+        = (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+      Assert.assertEquals(2, fs.getMountPoints().length);
+      fs.createNewFile(hdfsFile); // /HDFSUser/testfile
+      fs.mkdirs(localDir); // /local/test
+    }
+
+    // Initialize HDFS and test files exist in ls or not
+    try (DistributedFileSystem dfs = new DistributedFileSystem()) {
+      dfs.initialize(defaultFSURI, conf);
+      Assert.assertTrue(dfs.exists(
+          new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath),
+              hdfsFile.getName()))); // should be in hdfs.
+      Assert.assertFalse(dfs.exists(
+          new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath),
+              localDir.getName()))); // should not be in local fs.
+    }
+
+    try (RawLocalFileSystem lfs = new RawLocalFileSystem()) {
+      lfs.initialize(localTragetPath.toUri(), conf);
+      Assert.assertFalse(lfs.exists(
+          new Path(Path.getPathWithoutSchemeAndAuthority(hdfsTargetPath),
+              hdfsFile.getName()))); // should not be in hdfs.
+      Assert.assertTrue(lfs.exists(
+          new Path(Path.getPathWithoutSchemeAndAuthority(localTragetPath),
+              localDir.getName()))); // should be in local fs.
+    }
+  }
+
+  /**
+   * Create mount links as follows.
+   * hdfs://localhost:xxx/HDFSUser --> nonexistent://NonExistent/User/
+   * It should fail to add non existent fs link.
+   */
+  @Test(expected = IOException.class, timeout = 30000)
+  public void testMountLinkWithNonExistentLink() throws Exception {
+    final String userFolder = "/User";
+    final Path nonExistTargetPath =
+        new Path("nonexistent://NonExistent" + userFolder);
+
+    /**
+     * Below addLink will create following mount points
+     * hdfs://localhost:xxx/User --> nonexistent://NonExistent/User/
+     */
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), userFolder,
+        nonExistTargetPath.toUri());
+    FileSystem.get(conf);
+    Assert.fail("Expected to fail with non existent link");
+  }
+
+  /**
+   * Create mount links as follows.
+   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
+   * ListStatus on / should list the mount links.
+   */
+  @Test(timeout = 30000)
+  public void testListStatusOnRootShouldListAllMountLinks() throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+
+    createLinks(false, hdfsTargetPath, localTragetPath);
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      FileStatus[] ls = fs.listStatus(new Path("/"));
+      Assert.assertEquals(2, ls.length);
+      String lsPath1 =
+          Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).toString();
+      String lsPath2 =
+          Path.getPathWithoutSchemeAndAuthority(ls[1].getPath()).toString();
+      Assert.assertTrue(
+          HDFS_USER_FOLDER.equals(lsPath1) || LOCAL_FOLDER.equals(lsPath1));
+      Assert.assertTrue(
+          HDFS_USER_FOLDER.equals(lsPath2) || LOCAL_FOLDER.equals(lsPath2));
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
+   * ListStatus non mount directory should fail.
+   */
+  @Test(expected = IOException.class, timeout = 30000)
+  public void testListStatusOnNonMountedPath() throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+
+    createLinks(false, hdfsTargetPath, localTragetPath);
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      fs.listStatus(new Path("/nonMount"));
+      Assert.fail("It should fail as no mount link with /nonMount");
+    }
+  }
+
+  /**
+   * Create mount links as follows hdfs://localhost:xxx/HDFSUser -->
+   * hdfs://localhost:xxx/HDFSUser/ hdfs://localhost:xxx/local -->
+   * file://TEST_ROOT_DIR/root/ fallback --> hdfs://localhost:xxx/HDFSUser/
+   * Creating file or directory at non root level should succeed with fallback
+   * links.
+   */
+  @Test(timeout = 30000)
+  public void testWithLinkFallBack() throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+
+    createLinks(true, hdfsTargetPath, localTragetPath);
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      fs.createNewFile(new Path("/nonMount/myfile"));
+      FileStatus[] ls = fs.listStatus(new Path("/nonMount"));
+      Assert.assertEquals(1, ls.length);
+      Assert.assertEquals(
+          Path.getPathWithoutSchemeAndAuthority(ls[0].getPath()).getName(),
+          "myfile");
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
+   *
+   * It cannot find any mount link. ViewFS expects a mount point from root.
+   */
+  @Test(expected = NotInMountpointException.class, timeout = 30000)
+  public void testCreateOnRootShouldFailWhenMountLinkConfigured()
+      throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+
+    createLinks(false, hdfsTargetPath, localTragetPath);
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      fs.createNewFile(new Path("/newFileOnRoot"));
+      Assert.fail("It should fail as root is read only in viewFS.");
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
+   * fallback --> hdfs://localhost:xxx/HDFSUser/
+   *
+   * It will find fallback link, but root is not accessible and read only.
+   */
+  @Test(expected = AccessControlException.class, timeout = 30000)
+  public void testCreateOnRootShouldFailEvenFallBackMountLinkConfigured()
+      throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+    createLinks(true, hdfsTargetPath, localTragetPath);
+    try (FileSystem fs = FileSystem.get(conf)) {
+      fs.createNewFile(new Path("/onRootWhenFallBack"));
+      Assert.fail(
+          "It should fail as root is read only in viewFS, even when configured"
+              + " with fallback.");
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
+   * fallback --> hdfs://localhost:xxx/HDFSUser/
+   *
+   * Note: Above links created because to make fs initialization success.
+   * Otherwise will not proceed if no mount links.
+   *
+   * Don't set fs.viewfs.overload.scheme.target.hdfs.impl property.
+   * So, OverloadScheme target fs initialization will fail.
+   */
+  @Test(expected = UnsupportedFileSystemException.class, timeout = 30000)
+  public void testInvalidOverloadSchemeTargetFS() throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+    conf = new Configuration();
+    createLinks(true, hdfsTargetPath, localTragetPath);
+    conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY,
+        defaultFSURI.toString());
+    conf.set(String.format(FS_IMPL_PATTERN_KEY, HDFS_SCHEME),
+        ViewFileSystemOverloadScheme.class.getName());
+
+    try (FileSystem fs = FileSystem.get(conf)) {
+      fs.createNewFile(new Path("/onRootWhenFallBack"));
+      Assert.fail("OverloadScheme target fs should be valid.");
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/HDFSUser --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/local --> file://TEST_ROOT_DIR/root/
+   *
+   * It should be able to create file using ViewFileSystemOverloadScheme.
+   */
+  @Test(timeout = 30000)
+  public void testViewFsOverloadSchemeWhenInnerCacheDisabled()
+      throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+    createLinks(false, hdfsTargetPath, localTragetPath);
+    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    try (FileSystem fs = FileSystem.get(conf)) {
+      Path testFile = new Path(HDFS_USER_FOLDER + "/testFile");
+      fs.createNewFile(testFile);
+      Assert.assertTrue(fs.exists(testFile));
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/
+   *
+   * 1. With cache, only one hdfs child file system instance should be there.
+   * 2. Without cache, there should 2 hdfs instances.
+   */
+  @Test(timeout = 30000)
+  public void testViewFsOverloadSchemeWithInnerCache()
+      throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
+        hdfsTargetPath.toUri());
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
+        hdfsTargetPath.toUri());
+
+    // 1. Only 1 hdfs child file system should be there with cache.
+    try (ViewFileSystemOverloadScheme vfs =
+        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+      Assert.assertEquals(1, vfs.getChildFileSystems().length);
+    }
+
+    // 2. Two hdfs file systems should be there if no cache.
+    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    try (ViewFileSystemOverloadScheme vfs =
+        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+      Assert.assertEquals(2, vfs.getChildFileSystems().length);
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/HDFSUser0 --> hdfs://localhost:xxx/HDFSUser/
+   * hdfs://localhost:xxx/HDFSUser1 --> hdfs://localhost:xxx/HDFSUser/
+   *
+   * When InnerCache disabled, all matching ViewFileSystemOverloadScheme
+   * initialized scheme file systems would not use FileSystem cache.
+   */
+  @Test(timeout = 3000)
+  public void testViewFsOverloadSchemeWithNoInnerCacheAndHdfsTargets()
+      throws Exception {
+    final Path hdfsTargetPath = new Path(defaultFSURI + HDFS_USER_FOLDER);
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 0,
+        hdfsTargetPath.toUri());
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), HDFS_USER_FOLDER + 1,
+        hdfsTargetPath.toUri());
+
+    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    // Two hdfs file systems should be there if no cache.
+    try (ViewFileSystemOverloadScheme vfs =
+        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+      Assert.assertEquals(2, vfs.getChildFileSystems().length);
+    }
+  }
+
+  /**
+   * Create mount links as follows
+   * hdfs://localhost:xxx/local0 --> file://localPath/
+   * hdfs://localhost:xxx/local1 --> file://localPath/
+   *
+   * When InnerCache disabled, all non matching ViewFileSystemOverloadScheme
+   * initialized scheme file systems should continue to take advantage of
+   * FileSystem cache.
+   */
+  @Test(timeout = 3000)
+  public void testViewFsOverloadSchemeWithNoInnerCacheAndLocalSchemeTargets()
+      throws Exception {
+    final Path localTragetPath = new Path(localTargetDir.toURI());
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 0,
+        localTragetPath.toUri());
+    ConfigUtil.addLink(conf, defaultFSURI.getAuthority(), LOCAL_FOLDER + 1,
+        localTragetPath.toUri());
+
+    // Only one local file system should be there if no InnerCache, but fs
+    // cache should work.
+    conf.setBoolean(Constants.CONFIG_VIEWFS_ENABLE_INNER_CACHE, false);
+    try (ViewFileSystemOverloadScheme vfs =
+        (ViewFileSystemOverloadScheme) FileSystem.get(conf)) {
+      Assert.assertEquals(1, vfs.getChildFileSystems().length);
+    }
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org