You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2020/06/26 08:29:51 UTC

[hadoop] branch trunk updated: HDFS-15429. mkdirs should work when parent dir is an internalDir and fallback configured. Contributed by Uma Maheswara Rao G.

This is an automated email from the ASF dual-hosted git repository.

umamahesh pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new d5e1bb6  HDFS-15429. mkdirs should work when parent dir is an internalDir and fallback configured. Contributed by Uma Maheswara Rao G.
d5e1bb6 is described below

commit d5e1bb6155496cf9d82e121dd1b65d0072312197
Author: Uma Maheswara Rao G <um...@apache.org>
AuthorDate: Fri Jun 26 01:29:38 2020 -0700

    HDFS-15429. mkdirs should work when parent dir is an internalDir and fallback configured. Contributed by Uma Maheswara Rao G.
---
 .../apache/hadoop/fs/viewfs/ViewFileSystem.java    |  25 ++
 .../java/org/apache/hadoop/fs/viewfs/ViewFs.java   |  28 +-
 .../fs/viewfs/TestViewFileSystemLinkFallback.java  | 229 +++++++++++++---
 .../hadoop/fs/viewfs/TestViewFsLinkFallback.java   | 297 +++++++++++++++++++++
 4 files changed, 542 insertions(+), 37 deletions(-)

diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
index 06052b8..56448cb 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFileSystem.java
@@ -1339,6 +1339,31 @@ public class ViewFileSystem extends FileSystem {
           dir.toString().substring(1))) {
         return true; // this is the stupid semantics of FileSystem
       }
+
+      if (this.fsState.getRootFallbackLink() != null) {
+        FileSystem linkedFallbackFs =
+            this.fsState.getRootFallbackLink().getTargetFileSystem();
+        Path parent = Path.getPathWithoutSchemeAndAuthority(
+            new Path(theInternalDir.fullPath));
+        String leafChild = (InodeTree.SlashPath.equals(dir)) ?
+            InodeTree.SlashPath.toString() :
+            dir.getName();
+        Path dirToCreate = new Path(parent, leafChild);
+
+        try {
+          return linkedFallbackFs.mkdirs(dirToCreate, permission);
+        } catch (IOException e) {
+          if (LOG.isDebugEnabled()) {
+            StringBuilder msg =
+                new StringBuilder("Failed to create ").append(dirToCreate)
+                    .append(" at fallback : ")
+                    .append(linkedFallbackFs.getUri());
+            LOG.debug(msg.toString(), e);
+          }
+          return false;
+        }
+      }
+
       throw readOnlyMountTable("mkdirs",  dir);
     }
 
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
index d18233a..c769003 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/viewfs/ViewFs.java
@@ -1134,11 +1134,35 @@ public class ViewFs extends AbstractFileSystem {
 
     @Override
     public void mkdir(final Path dir, final FsPermission permission,
-        final boolean createParent) throws AccessControlException,
-        FileAlreadyExistsException {
+        final boolean createParent) throws IOException {
       if (theInternalDir.isRoot() && dir == null) {
         throw new FileAlreadyExistsException("/ already exits");
       }
+
+      if (this.fsState.getRootFallbackLink() != null) {
+        AbstractFileSystem linkedFallbackFs =
+            this.fsState.getRootFallbackLink().getTargetFileSystem();
+        Path parent = Path.getPathWithoutSchemeAndAuthority(
+            new Path(theInternalDir.fullPath));
+        String leafChild = (InodeTree.SlashPath.equals(dir)) ?
+            InodeTree.SlashPath.toString() :
+            dir.getName();
+        Path dirToCreate = new Path(parent, leafChild);
+        try {
+          // We are here because, the parent dir already exist in the mount
+          // table internal tree. So, let's create parent always in fallback.
+          linkedFallbackFs.mkdir(dirToCreate, permission, true);
+          return;
+        } catch (IOException e) {
+          if (LOG.isDebugEnabled()) {
+            StringBuilder msg = new StringBuilder("Failed to create {}")
+                .append(" at fallback fs : {}");
+            LOG.debug(msg.toString(), dirToCreate, linkedFallbackFs.getUri());
+          }
+          throw e;
+        }
+      }
+
       throw readOnlyMountTable("mkdir", dir);
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
index f7f5453..bec261c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFileSystemLinkFallback.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.fs.viewfs;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -69,7 +70,7 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
       "/tmp/TestViewFileSystemLinkFallback";
   private final static Logger LOG = LoggerFactory.getLogger(
       TestViewFileSystemLinkFallback.class);
-
+  private static URI viewFsDefaultClusterUri;
 
   @Override
   protected FileSystemTestHelper createFileSystemHelper() {
@@ -93,6 +94,8 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
       FS_HDFS[i] = cluster.getFileSystem(i);
     }
     fsDefault = FS_HDFS[FS_INDEX_DEFAULT];
+    viewFsDefaultClusterUri = new URI(FsConstants.VIEWFS_SCHEME,
+        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, "/", null, null);
   }
 
   @AfterClass
@@ -327,21 +330,20 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
     fsTarget.mkdirs(dir1);
     fsTarget.mkdirs(dir2);
     String clusterName = Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE;
-    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME, clusterName,
-        "/", null, null);
-
     HashSet<Path> beforeFallback = new HashSet<>();
-    try(FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
-      for (FileStatus stat : vfs.listStatus(new Path(viewFsUri.toString()))) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      for (FileStatus stat : vfs
+          .listStatus(new Path(viewFsDefaultClusterUri.toString()))) {
         beforeFallback.add(stat.getPath());
       }
     }
     ConfigUtil.addLinkFallback(conf, clusterName,
         new Path(targetTestRoot, "fallbackDir").toUri());
 
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
       HashSet<Path> afterFallback = new HashSet<>();
-      for (FileStatus stat : vfs.listStatus(new Path(viewFsUri.toString()))) {
+      for (FileStatus stat : vfs
+          .listStatus(new Path(viewFsDefaultClusterUri.toString()))) {
         afterFallback.add(stat.getPath());
       }
       afterFallback.removeAll(beforeFallback);
@@ -349,7 +351,7 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
           1, afterFallback.size());
       Path[] fallbackArray = new Path[afterFallback.size()];
       // Only user1 should be listed as fallback link
-      Path expected = new Path(viewFsUri.toString(), "user1");
+      Path expected = new Path(viewFsDefaultClusterUri.toString(), "user1");
       assertEquals("Path did not match",
           expected, afterFallback.toArray(fallbackArray)[0]);
 
@@ -401,23 +403,21 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
     fsTarget.mkdirs(dir2);
     fsTarget.setPermission(new Path(targetTestRoot, "fallbackDir/user1/hive/"),
         FsPermission.valueOf("-rwxr--r--"));
-    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
-        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, "/", null, null);
 
     HashSet<Path> beforeFallback = new HashSet<>();
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
-      for (FileStatus stat : vfs
-          .listStatus(new Path(viewFsUri.toString(), "/user1/hive/"))) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      for (FileStatus stat : vfs.listStatus(
+          new Path(viewFsDefaultClusterUri.toString(), "/user1/hive/"))) {
         beforeFallback.add(stat.getPath());
       }
     }
     ConfigUtil
         .addLinkFallback(conf, new Path(targetTestRoot, "fallbackDir").toUri());
 
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
       HashSet<Path> afterFallback = new HashSet<>();
-      for (FileStatus stat : vfs
-          .listStatus(new Path(viewFsUri.toString(), "/user1/hive/"))) {
+      for (FileStatus stat : vfs.listStatus(
+          new Path(viewFsDefaultClusterUri.toString(), "/user1/hive/"))) {
         afterFallback.add(stat.getPath());
         if (dir1.getName().equals(stat.getPath().getName())) {
           // make sure fallback dir listed out with correct permissions, but not
@@ -426,7 +426,6 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
               stat.getPermission());
         }
       }
-      //
       //viewfs://default/user1/hive/warehouse
       afterFallback.removeAll(beforeFallback);
       assertEquals("The same directory name in fallback link should be shaded",
@@ -475,23 +474,23 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
             "fallbackDir/user1/hive/warehouse/partition-0"),
         FsPermission.valueOf("-rwxr--r--"));
     fsTarget.setPermission(targetTestRoot, FsPermission.valueOf("-rwxr--rw-"));
-    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
-        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, "/", null, null);
 
     HashSet<Path> beforeFallback = new HashSet<>();
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
       for (FileStatus stat : vfs.listStatus(
-          new Path(viewFsUri.toString(), "/user1/hive/warehouse/"))) {
+          new Path(viewFsDefaultClusterUri.toString(),
+              "/user1/hive/warehouse/"))) {
         beforeFallback.add(stat.getPath());
       }
     }
     ConfigUtil
         .addLinkFallback(conf, new Path(targetTestRoot, "fallbackDir").toUri());
 
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
       HashSet<Path> afterFallback = new HashSet<>();
       for (FileStatus stat : vfs.listStatus(
-          new Path(viewFsUri.toString(), "/user1/hive/warehouse/"))) {
+          new Path(viewFsDefaultClusterUri.toString(),
+              "/user1/hive/warehouse/"))) {
         afterFallback.add(stat.getPath());
         if (dir1.getName().equals(stat.getPath().getName())) {
           // make sure fallback dir listed out with correct permissions, but not
@@ -508,7 +507,7 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
 
   /**
    * Tests ListStatus on root with fallback configured.
-   * =============================Example.=======================================
+   * =============================Example.======================================
    * ===== Fallback path tree =============== Mount Path Tree ==================
    * ===========================================================================
    * *          /       /          *****               /                     ***
@@ -536,23 +535,21 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
     fsTarget.mkdirs(dir1);
     fsTarget.mkdirs(dir2, FsPermission.valueOf("-rwxr--r--"));
     fsTarget.setPermission(targetTestRoot, FsPermission.valueOf("-rwxr--rw-"));
-    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
-        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, "/", null, null);
 
     HashSet<Path> beforeFallback = new HashSet<>();
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
       for (FileStatus stat : vfs
-          .listStatus(new Path(viewFsUri.toString(), "/"))) {
+          .listStatus(new Path(viewFsDefaultClusterUri.toString(), "/"))) {
         beforeFallback.add(stat.getPath());
       }
     }
     ConfigUtil
         .addLinkFallback(conf, new Path(targetTestRoot, "fallbackDir").toUri());
 
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
       HashSet<Path> afterFallback = new HashSet<>();
       for (FileStatus stat : vfs
-          .listStatus(new Path(viewFsUri.toString(), "/"))) {
+          .listStatus(new Path(viewFsDefaultClusterUri.toString(), "/"))) {
         afterFallback.add(stat.getPath());
         if (dir1.getName().equals(stat.getPath().getName())) {
           // make sure fallback dir listed out with correct permissions, but not
@@ -584,15 +581,14 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
     fsTarget.createNewFile(file1);
     Path dir2 = new Path(targetTestRoot, "fallbackDir/user1/hive/warehouse1");
     fsTarget.mkdirs(dir2);
-    URI viewFsUri = new URI(FsConstants.VIEWFS_SCHEME,
-        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, "/", null, null);
 
     ConfigUtil
         .addLinkFallback(conf, new Path(targetTestRoot, "fallbackDir").toUri());
 
-    try (FileSystem vfs = FileSystem.get(viewFsUri, conf)) {
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
       for (FileStatus stat : vfs.listStatus(
-          new Path(viewFsUri.toString(), "/user1/hive/warehouse/"))) {
+          new Path(viewFsDefaultClusterUri.toString(),
+              "/user1/hive/warehouse/"))) {
         if (file1.getName().equals(stat.getPath().getName())) {
           // Link represents as symlink.
           assertFalse(stat.isFile());
@@ -606,4 +602,167 @@ public class TestViewFileSystemLinkFallback extends ViewFileSystemBaseTest {
       }
     }
   }
+
+  /**
+   * Tests that directory making should be successful when the parent directory
+   * is same as the existent fallback directory. The new dir should be created
+   * in fallback instead failing.
+   */
+  @Test
+  public void testMkdirsOfLinkParentWithFallbackLinkWithSameMountDirectoryTree()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive/warehouse/partition-0",
+        new Path(targetTestRoot.toString()).toUri());
+    Path dir1 = new Path(targetTestRoot,
+        "fallbackDir/user1/hive/warehouse/partition-0");
+    fsTarget.mkdirs(dir1);
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      Path p = new Path("/user1/hive/warehouse/test");
+      Path test = Path.mergePaths(fallbackTarget, p);
+      assertFalse(fsTarget.exists(test));
+      assertTrue(vfs.mkdirs(p));
+      assertTrue(fsTarget.exists(test));
+    }
+  }
+
+  /**
+   * Tests that directory making should be successful when attempting to create
+   * the root directory as it's already exist.
+   */
+  @Test
+  public void testMkdirsOfRootWithFallbackLinkAndMountWithSameDirTree()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil
+        .addLink(conf, "/user1", new Path(targetTestRoot.toString()).toUri());
+    Path dir1 = new Path(targetTestRoot, "fallbackDir/user1");
+    fsTarget.mkdirs(dir1);
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      Path p = new Path("/");
+      Path test = Path.mergePaths(fallbackTarget, p);
+      assertTrue(fsTarget.exists(test));
+      assertTrue(vfs.mkdirs(p));
+      assertTrue(fsTarget.exists(test));
+    }
+  }
+
+  /**
+   * Tests the making of a new directory which is not matching to any of
+   * internal directory under the root.
+   */
+  @Test
+  public void testMkdirsOfNewDirWithOutMatchingToMountOrFallbackDirTree()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive/warehouse/partition-0",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      // user2 does not exist in fallback
+      Path p = new Path("/user2");
+      Path test = Path.mergePaths(fallbackTarget, p);
+      assertFalse(fsTarget.exists(test));
+      assertTrue(vfs.mkdirs(p));
+      assertTrue(fsTarget.exists(test));
+    }
+  }
+
+  /**
+   * Tests that when the parent dirs does not exist in fallback but the parent
+   * dir is same as mount internal directory, then we create parent structure
+   * (mount internal directory tree structure) in fallback.
+   */
+  @Test
+  public void testMkdirsWithFallbackLinkWithMountPathMatchingDirExist()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      //user1 does not exist in fallback
+      Path immediateLevelToInternalDir = new Path("/user1/test");
+      Path test = Path.mergePaths(fallbackTarget, immediateLevelToInternalDir);
+      assertFalse(fsTarget.exists(test));
+      assertTrue(vfs.mkdirs(immediateLevelToInternalDir));
+      assertTrue(fsTarget.exists(test));
+    }
+  }
+
+  /**
+   * Tests that when the parent dirs does not exist in fallback but the
+   * immediate parent dir is not same as mount internal directory, then we
+   * create parent structure (mount internal directory tree structure) in
+   * fallback.
+   */
+  @Test
+  public void testMkdirsOfDeepTreeWithFallbackLinkAndMountPathMatchingDirExist()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      //user1 does not exist in fallback
+      Path multipleLevelToInternalDir = new Path("/user1/test/test");
+      Path test = Path.mergePaths(fallbackTarget, multipleLevelToInternalDir);
+      assertFalse(fsTarget.exists(test));
+      assertTrue(vfs.mkdirs(multipleLevelToInternalDir));
+      assertTrue(fsTarget.exists(test));
+    }
+  }
+
+  /**
+   * Tests that mkdirs should return false when there is a problem with
+   * fallbackfs.
+   */
+  @Test
+  public void testMkdirsShouldReturnFalseWhenFallbackFSNotAvailable()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/test",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+    try (FileSystem vfs = FileSystem.get(viewFsDefaultClusterUri, conf)) {
+      //user1/test1 does not exist in fallback
+      Path nextLevelToInternalDir = new Path("/user1/test1");
+      Path test = Path.mergePaths(fallbackTarget, nextLevelToInternalDir);
+      assertFalse(fsTarget.exists(test));
+      // user1 exists in viewFS mount.
+      assertNotNull(vfs.getFileStatus(new Path("/user1")));
+      // user1 does not exists in fallback.
+      assertFalse(fsTarget.exists(test.getParent()));
+      cluster.shutdownNameNodes(); // Stopping fallback server
+      // /user1/test1 does not exist in mount internal dir tree, it would
+      // attempt to create in fallback.
+      assertFalse(vfs.mkdirs(nextLevelToInternalDir));
+      cluster.restartNameNodes();
+      // should return true succeed when fallback fs is back to normal.
+      assertTrue(vfs.mkdirs(nextLevelToInternalDir));
+      assertTrue(fsTarget.exists(test));
+    }
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsLinkFallback.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsLinkFallback.java
new file mode 100644
index 0000000..49c0957
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/fs/viewfs/TestViewFsLinkFallback.java
@@ -0,0 +1,297 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.fs.viewfs;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.net.URI;
+import java.net.URISyntaxException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.AbstractFileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FsConstants;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Test for viewfs with LinkFallback mount table entries.
+ */
+public class TestViewFsLinkFallback {
+  private static FileSystem fsDefault;
+  private FileSystem fsTarget;
+  private static MiniDFSCluster cluster;
+  private static URI viewFsDefaultClusterUri;
+  private Path targetTestRoot;
+
+  @BeforeClass
+  public static void clusterSetupAtBeginning()
+      throws IOException, URISyntaxException {
+    int nameSpacesCount = 3;
+    int dataNodesCount = 3;
+    int fsIndexDefault = 0;
+    Configuration conf = new Configuration();
+    FileSystem[] fsHdfs = new FileSystem[nameSpacesCount];
+    conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_KEY,
+        true);
+    cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(
+            nameSpacesCount))
+        .numDataNodes(dataNodesCount)
+        .build();
+    cluster.waitClusterUp();
+
+    for (int i = 0; i < nameSpacesCount; i++) {
+      fsHdfs[i] = cluster.getFileSystem(i);
+    }
+    fsDefault = fsHdfs[fsIndexDefault];
+    viewFsDefaultClusterUri = new URI(FsConstants.VIEWFS_SCHEME,
+        Constants.CONFIG_VIEWFS_DEFAULT_MOUNT_TABLE, "/", null, null);
+
+  }
+
+  @AfterClass
+  public static void clusterShutdownAtEnd() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    fsTarget = fsDefault;
+    initializeTargetTestRoot();
+  }
+
+  private void initializeTargetTestRoot() throws IOException {
+    targetTestRoot = fsDefault.makeQualified(new Path("/"));
+    for (FileStatus status : fsDefault.listStatus(targetTestRoot)) {
+      fsDefault.delete(status.getPath(), true);
+    }
+  }
+
+  /**
+   * Tests that directory making should be successful when the parent directory
+   * is same as the existent fallback directory. The new dir should be created
+   * in fallback instead failing.
+   */
+  @Test
+  public void testMkdirOfLinkParentWithFallbackLinkWithSameMountDirectoryTree()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive/warehouse/partition-0",
+        new Path(targetTestRoot.toString()).toUri());
+    Path dir1 = new Path(targetTestRoot,
+        "fallbackDir/user1/hive/warehouse/partition-0");
+    fsTarget.mkdirs(dir1);
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+    AbstractFileSystem vfs =
+        AbstractFileSystem.get(viewFsDefaultClusterUri, conf);
+    Path p = new Path("/user1/hive/warehouse/test");
+    Path test = Path.mergePaths(fallbackTarget, p);
+    assertFalse(fsTarget.exists(test));
+    vfs.mkdir(p, null, true);
+    assertTrue(fsTarget.exists(test));
+  }
+
+  /**
+   * Tests that directory making should be successful when attempting to create
+   * the root directory as it's already exist.
+   */
+  @Test
+  public void testMkdirOfRootWithFallbackLinkAndMountWithSameDirTree()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil
+        .addLink(conf, "/user1", new Path(targetTestRoot.toString()).toUri());
+    Path dir1 = new Path(targetTestRoot, "fallbackDir/user1");
+    fsTarget.mkdirs(dir1);
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+    AbstractFileSystem vfs =
+        AbstractFileSystem.get(viewFsDefaultClusterUri, conf);
+    Path p = new Path("/");
+    Path test = Path.mergePaths(fallbackTarget, p);
+    assertTrue(fsTarget.exists(test));
+    vfs.mkdir(p, null, true);
+    assertTrue(fsTarget.exists(test));
+  }
+
+  /**
+   * Tests the making of a new directory which is not matching to any of
+   * internal directory under the root.
+   */
+  @Test
+  public void testMkdirOfNewDirWithOutMatchingToMountOrFallbackDirTree()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive/warehouse/partition-0",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+    AbstractFileSystem vfs =
+        AbstractFileSystem.get(viewFsDefaultClusterUri, conf);
+    // user2 does not exist in fallback
+    Path p = new Path("/user2");
+    Path test = Path.mergePaths(fallbackTarget, p);
+    assertFalse(fsTarget.exists(test));
+    vfs.mkdir(p, null, true);
+    assertTrue(fsTarget.exists(test));
+  }
+
+  /**
+   * Tests that when the parent dirs does not exist in fallback but the parent
+   * dir is same as mount internal directory, then we create parent structure
+   * (mount internal directory tree structure) in fallback.
+   */
+  @Test
+  public void testMkdirWithFallbackLinkWithMountPathMatchingDirExist()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+
+    AbstractFileSystem vfs =
+        AbstractFileSystem.get(viewFsDefaultClusterUri, conf);
+    //user1 does not exist in fallback
+    Path immediateLevelToInternalDir = new Path("/user1/test");
+    Path test = Path.mergePaths(fallbackTarget, immediateLevelToInternalDir);
+    assertFalse(fsTarget.exists(test));
+    vfs.mkdir(immediateLevelToInternalDir, null, true);
+    assertTrue(fsTarget.exists(test));
+  }
+
+  /**
+   * Tests that when the parent dirs does not exist in fallback but the
+   * immediate parent dir is not same as mount internal directory, then we
+   * create parent structure (mount internal directory tree structure) in
+   * fallback.
+   */
+  @Test
+  public void testMkdirOfDeepTreeWithFallbackLinkAndMountPathMatchingDirExist()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+
+    AbstractFileSystem vfs =
+        AbstractFileSystem.get(viewFsDefaultClusterUri, conf);
+    //user1 does not exist in fallback
+    Path multipleLevelToInternalDir = new Path("/user1/test/test");
+    Path test = Path.mergePaths(fallbackTarget, multipleLevelToInternalDir);
+    assertFalse(fsTarget.exists(test));
+    vfs.mkdir(multipleLevelToInternalDir, null, true);
+    assertTrue(fsTarget.exists(test));
+  }
+
+  /**
+   * Tests that mkdir with createParent false should still create parent in
+   * fallback when the same mount dir exist.
+   */
+  @Test
+  public void testMkdirShouldCreateParentDirInFallbackWhenMountDirExist()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/hive/test",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+    AbstractFileSystem vfs = AbstractFileSystem.get(viewFsDefaultClusterUri,
+        conf);
+    //user1/hive/test1 does not exist in fallback
+    Path multipleLevelToInternalDir = new Path("/user1/hive/test1");
+    Path test = Path.mergePaths(fallbackTarget, multipleLevelToInternalDir);
+    assertFalse(fsTarget.exists(test));
+    // user1/hive exist in viewFS.
+    assertNotNull(vfs.getFileStatus(new Path("/user1/hive")));
+    // user1/hive does not exists in fallback.
+    assertFalse(fsTarget.exists(test.getParent()));
+
+    vfs.mkdir(multipleLevelToInternalDir, FsPermission.getDirDefault(), false);
+    assertTrue(fsTarget.exists(test));
+
+  }
+
+  /**
+   * Tests that mkdir should fail with IOE when there is a problem with
+   * fallbackfs.
+   */
+  @Test
+  public void testMkdirShouldFailWhenFallbackFSNotAvailable()
+      throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(Constants.CONFIG_VIEWFS_MOUNT_LINKS_AS_SYMLINKS, false);
+    ConfigUtil.addLink(conf, "/user1/test",
+        new Path(targetTestRoot.toString()).toUri());
+    Path fallbackTarget = new Path(targetTestRoot, "fallbackDir");
+    fsTarget.mkdirs(fallbackTarget);
+    ConfigUtil.addLinkFallback(conf, fallbackTarget.toUri());
+    AbstractFileSystem vfs = AbstractFileSystem.get(viewFsDefaultClusterUri,
+        conf);
+    //user1/test1 does not exist in fallback
+    Path nextLevelToInternalDir = new Path("/user1/test1");
+    Path test = Path.mergePaths(fallbackTarget, nextLevelToInternalDir);
+    assertFalse(fsTarget.exists(test));
+    // user1 exists in viewFS mount.
+    assertNotNull(vfs.getFileStatus(new Path("/user1")));
+    // user1 does not exists in fallback.
+    assertFalse(fsTarget.exists(test.getParent()));
+    cluster.shutdownNameNodes();
+    try {
+      // /user1/test1 does not exist in mount internal dir tree, it would
+      // attempt to create in fallback.
+      vfs.mkdir(nextLevelToInternalDir, FsPermission.getDirDefault(),
+          false);
+      Assert.fail("It should throw IOE when fallback fs not available.");
+    } catch (IOException e) {
+      cluster.restartNameNodes();
+      // should succeed when fallback fs is back to normal.
+      vfs.mkdir(nextLevelToInternalDir, FsPermission.getDirDefault(),
+          false);
+    }
+    assertTrue(fsTarget.exists(test));
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org