You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/03/03 18:37:18 UTC

[hadoop] 39/45: HDFS-14226. RBF: Setting attributes should set on all subclusters' directories. Contributed by Ayush Saxena.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 9b1c43ec42cbbccbbd1e4b2f765ccd2bdb53baac
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Fri Feb 15 09:25:09 2019 -0800

    HDFS-14226. RBF: Setting attributes should set on all subclusters' directories. Contributed by Ayush Saxena.
---
 .../server/federation/router/ErasureCoding.java    |  12 +-
 .../federation/router/RouterClientProtocol.java    |  55 ++-
 .../server/federation/router/RouterRpcServer.java  |  46 ++-
 .../federation/router/RouterStoragePolicy.java     |  12 +-
 ...erRPCMultipleDestinationMountTableResolver.java | 394 +++++++++++++++++++++
 5 files changed, 482 insertions(+), 37 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
index 480b232..f4584b1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
@@ -157,7 +157,11 @@ public class ErasureCoding {
     RemoteMethod remoteMethod = new RemoteMethod("setErasureCodingPolicy",
         new Class<?>[] {String.class, String.class},
         new RemoteParam(), ecPolicyName);
-    rpcClient.invokeSequential(locations, remoteMethod, null, null);
+    if (rpcServer.isInvokeConcurrent(src)) {
+      rpcClient.invokeConcurrent(locations, remoteMethod);
+    } else {
+      rpcClient.invokeSequential(locations, remoteMethod);
+    }
   }
 
   public void unsetErasureCodingPolicy(String src) throws IOException {
@@ -167,7 +171,11 @@ public class ErasureCoding {
         rpcServer.getLocationsForPath(src, true);
     RemoteMethod remoteMethod = new RemoteMethod("unsetErasureCodingPolicy",
         new Class<?>[] {String.class}, new RemoteParam());
-    rpcClient.invokeSequential(locations, remoteMethod, null, null);
+    if (rpcServer.isInvokeConcurrent(src)) {
+      rpcClient.invokeConcurrent(locations, remoteMethod);
+    } else {
+      rpcClient.invokeSequential(locations, remoteMethod);
+    }
   }
 
   public ECBlockGroupStats getECBlockGroupStats() throws IOException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 5383a7d..6cc12ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -213,7 +213,7 @@ public class RouterClientProtocol implements ClientProtocol {
       throws IOException {
     rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
 
-    if (createParent && isPathAll(src)) {
+    if (createParent && rpcServer.isPathAll(src)) {
       int index = src.lastIndexOf(Path.SEPARATOR);
       String parent = src.substring(0, index);
       LOG.debug("Creating {} requires creating parent {}", src, parent);
@@ -273,9 +273,13 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("setReplication",
         new Class<?>[] {String.class, short.class}, new RemoteParam(),
         replication);
-    Object result = rpcClient.invokeSequential(
-        locations, method, Boolean.class, Boolean.TRUE);
-    return (boolean) result;
+    if (rpcServer.isInvokeConcurrent(src)) {
+      return !rpcClient.invokeConcurrent(locations, method, Boolean.class)
+          .containsValue(false);
+    } else {
+      return rpcClient.invokeSequential(locations, method, Boolean.class,
+          Boolean.TRUE);
+    }
   }
 
   @Override
@@ -299,7 +303,7 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("setPermission",
         new Class<?>[] {String.class, FsPermission.class},
         new RemoteParam(), permissions);
-    if (isPathAll(src)) {
+    if (rpcServer.isInvokeConcurrent(src)) {
       rpcClient.invokeConcurrent(locations, method);
     } else {
       rpcClient.invokeSequential(locations, method);
@@ -316,7 +320,7 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("setOwner",
         new Class<?>[] {String.class, String.class, String.class},
         new RemoteParam(), username, groupname);
-    if (isPathAll(src)) {
+    if (rpcServer.isInvokeConcurrent(src)) {
       rpcClient.invokeConcurrent(locations, method);
     } else {
       rpcClient.invokeSequential(locations, method);
@@ -549,7 +553,7 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("delete",
         new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
         recursive);
-    if (isPathAll(src)) {
+    if (rpcServer.isPathAll(src)) {
       return rpcClient.invokeAll(locations, method);
     } else {
       return rpcClient.invokeSequential(locations, method,
@@ -569,7 +573,7 @@ public class RouterClientProtocol implements ClientProtocol {
         new RemoteParam(), masked, createParent);
 
     // Create in all locations
-    if (isPathAll(src)) {
+    if (rpcServer.isPathAll(src)) {
       return rpcClient.invokeAll(locations, method);
     }
 
@@ -707,7 +711,7 @@ public class RouterClientProtocol implements ClientProtocol {
 
     HdfsFileStatus ret = null;
     // If it's a directory, we check in all locations
-    if (isPathAll(src)) {
+    if (rpcServer.isPathAll(src)) {
       ret = getFileInfoAll(locations, method);
     } else {
       // Check for file information sequentially
@@ -1309,7 +1313,11 @@ public class RouterClientProtocol implements ClientProtocol {
     RemoteMethod method = new RemoteMethod("setXAttr",
         new Class<?>[] {String.class, XAttr.class, EnumSet.class},
         new RemoteParam(), xAttr, flag);
-    rpcClient.invokeSequential(locations, method);
+    if (rpcServer.isInvokeConcurrent(src)) {
+      rpcClient.invokeConcurrent(locations, method);
+    } else {
+      rpcClient.invokeSequential(locations, method);
+    }
   }
 
   @SuppressWarnings("unchecked")
@@ -1350,7 +1358,11 @@ public class RouterClientProtocol implements ClientProtocol {
         rpcServer.getLocationsForPath(src, true);
     RemoteMethod method = new RemoteMethod("removeXAttr",
         new Class<?>[] {String.class, XAttr.class}, new RemoteParam(), xAttr);
-    rpcClient.invokeSequential(locations, method);
+    if (rpcServer.isInvokeConcurrent(src)) {
+      rpcClient.invokeConcurrent(locations, method);
+    } else {
+      rpcClient.invokeSequential(locations, method);
+    }
   }
 
   @Override
@@ -1713,27 +1725,6 @@ public class RouterClientProtocol implements ClientProtocol {
   }
 
   /**
-   * Check if a path should be in all subclusters.
-   *
-   * @param path Path to check.
-   * @return If a path should be in all subclusters.
-   */
-  private boolean isPathAll(final String path) {
-    if (subclusterResolver instanceof MountTableResolver) {
-      try {
-        MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
-        MountTable entry = mountTable.getMountPoint(path);
-        if (entry != null) {
-          return entry.isAll();
-        }
-      } catch (IOException e) {
-        LOG.error("Cannot get mount point", e);
-      }
-    }
-    return false;
-  }
-
-  /**
    * Create a new file status for a mount point.
    *
    * @param name Name of the mount point.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index a312d4b..e4ea58b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -1541,4 +1541,48 @@ public class RouterRpcServer extends AbstractService
   public FederationRPCMetrics getRPCMetrics() {
     return this.rpcMonitor.getRPCMetrics();
   }
-}
+
+  /**
+   * Check if a path should be in all subclusters.
+   *
+   * @param path Path to check.
+   * @return If a path should be in all subclusters.
+   */
+  boolean isPathAll(final String path) {
+    if (subclusterResolver instanceof MountTableResolver) {
+      try {
+        MountTableResolver mountTable = (MountTableResolver) subclusterResolver;
+        MountTable entry = mountTable.getMountPoint(path);
+        if (entry != null) {
+          return entry.isAll();
+        }
+      } catch (IOException e) {
+        LOG.error("Cannot get mount point", e);
+      }
+    }
+    return false;
+  }
+
+  /**
+   * Check if call needs to be invoked to all the locations. The call is
+   * supposed to be invoked in all the locations in case the order of the mount
+   * entry is amongst HASH_ALL, RANDOM or SPACE or if the source is itself a
+   * mount entry.
+   * @param path The path on which the operation need to be invoked.
+   * @return true if the call is supposed to invoked on all locations.
+   * @throws IOException
+   */
+  boolean isInvokeConcurrent(final String path) throws IOException {
+    if (subclusterResolver instanceof MountTableResolver) {
+      MountTableResolver mountTableResolver =
+          (MountTableResolver) subclusterResolver;
+      List<String> mountPoints = mountTableResolver.getMountPoints(path);
+      // If this is a mount point, we need to invoke everywhere.
+      if (mountPoints != null) {
+        return true;
+      }
+      return isPathAll(path);
+    }
+    return false;
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java
index 8a55b9a..a4538b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java
@@ -50,7 +50,11 @@ public class RouterStoragePolicy {
         new Class<?>[] {String.class, String.class},
         new RemoteParam(),
         policyName);
-    rpcClient.invokeSequential(locations, method, null, null);
+    if (rpcServer.isInvokeConcurrent(src)) {
+      rpcClient.invokeConcurrent(locations, method);
+    } else {
+      rpcClient.invokeSequential(locations, method);
+    }
   }
 
   public BlockStoragePolicy[] getStoragePolicies() throws IOException {
@@ -67,7 +71,11 @@ public class RouterStoragePolicy {
     RemoteMethod method = new RemoteMethod("unsetStoragePolicy",
         new Class<?>[] {String.class},
         new RemoteParam());
-    rpcClient.invokeSequential(locations, method);
+    if (rpcServer.isInvokeConcurrent(src)) {
+      rpcClient.invokeConcurrent(locations, method);
+    } else {
+      rpcClient.invokeSequential(locations, method);
+    }
   }
 
   public BlockStoragePolicy getStoragePolicy(String path)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
new file mode 100644
index 0000000..8c15151
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRPCMultipleDestinationMountTableResolver.java
@@ -0,0 +1,394 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster.RouterContext;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.StateStoreDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableManager;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MultipleDestinationMountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.order.DestinationOrder;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.AddMountTableEntryResponse;
+import org.apache.hadoop.hdfs.server.federation.store.protocol.RemoveMountTableEntryRequest;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+/**
+ * Tests router rpc with multiple destination mount table resolver.
+ */
+public class TestRouterRPCMultipleDestinationMountTableResolver {
+  private static StateStoreDFSCluster cluster;
+  private static RouterContext routerContext;
+  private static MountTableResolver resolver;
+  private static DistributedFileSystem nnFs0;
+  private static DistributedFileSystem nnFs1;
+  private static DistributedFileSystem routerFs;
+  private static RouterRpcServer rpcServer;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+
+    // Build and start a federated cluster
+    cluster = new StateStoreDFSCluster(false, 2,
+        MultipleDestinationMountTableResolver.class);
+    Configuration routerConf =
+        new RouterConfigBuilder().stateStore().admin().quota().rpc().build();
+
+    Configuration hdfsConf = new Configuration(false);
+
+    cluster.addRouterOverrides(routerConf);
+    cluster.addNamenodeOverrides(hdfsConf);
+    cluster.startCluster();
+    cluster.startRouters();
+    cluster.waitClusterUp();
+
+    routerContext = cluster.getRandomRouter();
+    resolver =
+        (MountTableResolver) routerContext.getRouter().getSubclusterResolver();
+    nnFs0 = (DistributedFileSystem) cluster
+        .getNamenode(cluster.getNameservices().get(0), null).getFileSystem();
+    nnFs1 = (DistributedFileSystem) cluster
+        .getNamenode(cluster.getNameservices().get(1), null).getFileSystem();
+    routerFs = (DistributedFileSystem) routerContext.getFileSystem();
+    rpcServer =routerContext.getRouter().getRpcServer();
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    if (cluster != null) {
+      cluster.stopRouter(routerContext);
+      cluster.shutdown();
+      cluster = null;
+    }
+  }
+
+  /**
+   * SetUp the mount entry , directories and file to verify invocation.
+   * @param order The order that the mount entry needs to follow.
+   * @throws Exception On account of any exception encountered during setting up
+   *           the environment.
+   */
+  public void setupOrderMountPath(DestinationOrder order) throws Exception {
+    Map<String, String> destMap = new HashMap<>();
+    destMap.put("ns0", "/tmp");
+    destMap.put("ns1", "/tmp");
+    nnFs0.mkdirs(new Path("/tmp"));
+    nnFs1.mkdirs(new Path("/tmp"));
+    MountTable addEntry = MountTable.newInstance("/mount", destMap);
+    addEntry.setDestOrder(order);
+    assertTrue(addMountTable(addEntry));
+    routerFs.mkdirs(new Path("/mount/dir/dir"));
+    DFSTestUtil.createFile(routerFs, new Path("/mount/dir/file"), 100L, (short) 1,
+        1024L);
+    DFSTestUtil.createFile(routerFs, new Path("/mount/file"), 100L, (short) 1,
+        1024L);
+  }
+
+  @After
+  public void resetTestEnvironment() throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    RemoveMountTableEntryRequest req2 =
+        RemoveMountTableEntryRequest.newInstance("/mount");
+    mountTableManager.removeMountTableEntry(req2);
+    nnFs0.delete(new Path("/tmp"), true);
+    nnFs1.delete(new Path("/tmp"), true);
+
+  }
+
+  @Test
+  public void testInvocationSpaceOrder() throws Exception {
+    setupOrderMountPath(DestinationOrder.SPACE);
+    boolean isDirAll = rpcServer.isPathAll("/mount/dir");
+    assertTrue(isDirAll);
+    testInvocation(isDirAll);
+  }
+
+  @Test
+  public void testInvocationHashAllOrder() throws Exception {
+    setupOrderMountPath(DestinationOrder.HASH_ALL);
+    boolean isDirAll = rpcServer.isPathAll("/mount/dir");
+    assertTrue(isDirAll);
+    testInvocation(isDirAll);
+  }
+
+  @Test
+  public void testInvocationRandomOrder() throws Exception {
+    setupOrderMountPath(DestinationOrder.RANDOM);
+    boolean isDirAll = rpcServer.isPathAll("/mount/dir");
+    assertTrue(isDirAll);
+    testInvocation(isDirAll);
+  }
+
+  @Test
+  public void testInvocationHashOrder() throws Exception {
+    setupOrderMountPath(DestinationOrder.HASH);
+    boolean isDirAll = rpcServer.isPathAll("/mount/dir");
+    assertFalse(isDirAll);
+    testInvocation(isDirAll);
+  }
+
+  @Test
+  public void testInvocationLocalOrder() throws Exception {
+    setupOrderMountPath(DestinationOrder.LOCAL);
+    boolean isDirAll = rpcServer.isPathAll("/mount/dir");
+    assertFalse(isDirAll);
+    testInvocation(isDirAll);
+  }
+
+  /**
+   * Verifies the invocation of API's at directory level , file level and at
+   * mount level.
+   * @param dirAll if true assumes that the mount entry creates directory on all
+   *          locations.
+   * @throws IOException
+   */
+  private void testInvocation(boolean dirAll) throws IOException {
+    // Verify invocation on nested directory and file.
+    Path mountDir = new Path("/mount/dir/dir");
+    Path nameSpaceFile = new Path("/tmp/dir/file");
+    Path mountFile = new Path("/mount/dir/file");
+    Path mountEntry = new Path("/mount");
+    Path mountDest = new Path("/tmp");
+    Path nameSpaceDir = new Path("/tmp/dir/dir");
+    final String name = "user.a1";
+    final byte[] value = {0x31, 0x32, 0x33};
+    testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
+        mountFile, nameSpaceDir, name, value);
+
+    // Verify invocation on non nested directory and file.
+    mountDir = new Path("/mount/dir");
+    nameSpaceFile = new Path("/tmp/file");
+    mountFile = new Path("/mount/file");
+    nameSpaceDir = new Path("/tmp/dir");
+    testDirectoryAndFileLevelInvocation(dirAll, mountDir, nameSpaceFile,
+        mountFile, nameSpaceDir, name, value);
+
+    // Check invocation directly for a mount point.
+    // Verify owner and permissions.
+    routerFs.setOwner(mountEntry, "testuser", "testgroup");
+    routerFs.setPermission(mountEntry,
+        FsPermission.createImmutable((short) 777));
+    assertEquals("testuser", routerFs.getFileStatus(mountEntry).getOwner());
+    assertEquals("testuser", nnFs0.getFileStatus(mountDest).getOwner());
+    assertEquals("testuser", nnFs1.getFileStatus(mountDest).getOwner());
+    assertEquals((short) 777,
+        routerFs.getFileStatus(mountEntry).getPermission().toShort());
+    assertEquals((short) 777,
+        nnFs0.getFileStatus(mountDest).getPermission().toShort());
+    assertEquals((short) 777,
+        nnFs1.getFileStatus(mountDest).getPermission().toShort());
+
+    //Verify storage policy.
+    routerFs.setStoragePolicy(mountEntry, "COLD");
+    assertEquals("COLD", routerFs.getStoragePolicy(mountEntry).getName());
+    assertEquals("COLD", nnFs0.getStoragePolicy(mountDest).getName());
+    assertEquals("COLD", nnFs1.getStoragePolicy(mountDest).getName());
+    routerFs.unsetStoragePolicy(mountEntry);
+    assertEquals("HOT", routerFs.getStoragePolicy(mountDest).getName());
+    assertEquals("HOT", nnFs0.getStoragePolicy(mountDest).getName());
+    assertEquals("HOT", nnFs1.getStoragePolicy(mountDest).getName());
+
+    //Verify erasure coding policy.
+    routerFs.setErasureCodingPolicy(mountEntry, "RS-6-3-1024k");
+    assertEquals("RS-6-3-1024k",
+        routerFs.getErasureCodingPolicy(mountEntry).getName());
+    assertEquals("RS-6-3-1024k",
+        nnFs0.getErasureCodingPolicy(mountDest).getName());
+    assertEquals("RS-6-3-1024k",
+        nnFs1.getErasureCodingPolicy(mountDest).getName());
+    routerFs.unsetErasureCodingPolicy(mountEntry);
+    assertNull(routerFs.getErasureCodingPolicy(mountDest));
+    assertNull(nnFs0.getErasureCodingPolicy(mountDest));
+    assertNull(nnFs1.getErasureCodingPolicy(mountDest));
+
+    //Verify xAttr.
+    routerFs.setXAttr(mountEntry, name, value);
+    assertArrayEquals(value, routerFs.getXAttr(mountEntry, name));
+    assertArrayEquals(value, nnFs0.getXAttr(mountDest, name));
+    assertArrayEquals(value, nnFs1.getXAttr(mountDest, name));
+    routerFs.removeXAttr(mountEntry, name);
+    assertEquals(0, routerFs.getXAttrs(mountEntry).size());
+    assertEquals(0, nnFs0.getXAttrs(mountDest).size());
+    assertEquals(0, nnFs1.getXAttrs(mountDest).size());
+  }
+
+  /**
+   * SetUp to verify invocations on directories and file.
+   */
+  private void testDirectoryAndFileLevelInvocation(boolean dirAll,
+      Path mountDir, Path nameSpaceFile, Path mountFile, Path nameSpaceDir,
+      final String name, final byte[] value) throws IOException {
+    // Check invocation for a directory.
+    routerFs.setOwner(mountDir, "testuser", "testgroup");
+    routerFs.setPermission(mountDir, FsPermission.createImmutable((short) 777));
+    routerFs.setStoragePolicy(mountDir, "COLD");
+    routerFs.setErasureCodingPolicy(mountDir, "RS-6-3-1024k");
+    routerFs.setXAttr(mountDir, name, value);
+
+    // Verify the directory level invocations were checked in case of mounts not
+    // creating directories in all subclusters.
+    boolean checkedDir1 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
+        nnFs0, name, value);
+    boolean checkedDir2 = verifyDirectoryLevelInvocations(dirAll, nameSpaceDir,
+        nnFs1, name, value);
+    assertTrue("The file didn't existed in either of the subclusters.",
+        checkedDir1 || checkedDir2);
+    routerFs.unsetStoragePolicy(mountDir);
+    routerFs.removeXAttr(mountDir, name);
+    routerFs.unsetErasureCodingPolicy(mountDir);
+
+    checkedDir1 =
+        verifyDirectoryLevelUnsetInvocations(dirAll, nnFs0, nameSpaceDir);
+    checkedDir2 =
+        verifyDirectoryLevelUnsetInvocations(dirAll, nnFs1, nameSpaceDir);
+    assertTrue("The file didn't existed in either of the subclusters.",
+        checkedDir1 || checkedDir2);
+
+    // Check invocation for a file.
+    routerFs.setOwner(mountFile, "testuser", "testgroup");
+    routerFs.setPermission(mountFile,
+        FsPermission.createImmutable((short) 777));
+    routerFs.setStoragePolicy(mountFile, "COLD");
+    routerFs.setReplication(mountFile, (short) 2);
+    routerFs.setXAttr(mountFile, name, value);
+    verifyFileLevelInvocations(nameSpaceFile, nnFs0, mountFile, name, value);
+    verifyFileLevelInvocations(nameSpaceFile, nnFs1, mountFile, name, value);
+  }
+
+  /**
+   * Verify invocations of API's unseting values at the directory level.
+   * @param dirAll true if the mount entry order creates directory in all
+   *          locations.
+   * @param nameSpaceDir path of the directory in the namespace.
+   * @param nnFs file system where the directory level invocation needs to be
+   *          tested.
+   * @throws IOException
+   */
+  private boolean verifyDirectoryLevelUnsetInvocations(boolean dirAll,
+      DistributedFileSystem nnFs, Path nameSpaceDir) throws IOException {
+    boolean checked = false;
+    if (dirAll || nnFs.exists(nameSpaceDir)) {
+      checked = true;
+      assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceDir).getName());
+      assertNull(nnFs.getErasureCodingPolicy(nameSpaceDir));
+      assertEquals(0, nnFs.getXAttrs(nameSpaceDir).size());
+    }
+    return checked;
+  }
+
+  /**
+   * Verify file level invocations.
+   * @param nameSpaceFile path of the file in the namespace.
+   * @param nnFs the file system where the file invocation needs to checked.
+   * @param mountFile path of the file w.r.t. mount table.
+   * @param name name of Xattr.
+   * @param value value of Xattr.
+   * @throws IOException
+   */
+  private void verifyFileLevelInvocations(Path nameSpaceFile,
+      DistributedFileSystem nnFs, Path mountFile, final String name,
+      final byte[] value) throws IOException {
+    if (nnFs.exists(nameSpaceFile)) {
+      assertEquals("testuser", nnFs.getFileStatus(nameSpaceFile).getOwner());
+      assertEquals((short) 777,
+          nnFs.getFileStatus(nameSpaceFile).getPermission().toShort());
+      assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceFile).getName());
+      assertEquals((short) 2,
+          nnFs.getFileStatus(nameSpaceFile).getReplication());
+      assertArrayEquals(value, nnFs.getXAttr(nameSpaceFile, name));
+
+      routerFs.unsetStoragePolicy(mountFile);
+      routerFs.removeXAttr(mountFile, name);
+      assertEquals(0, nnFs.getXAttrs(nameSpaceFile).size());
+
+      assertEquals("HOT", nnFs.getStoragePolicy(nameSpaceFile).getName());
+
+    }
+  }
+
+  /**
+   * Verify invocations at the directory level.
+   * @param dirAll true if the mount entry order creates directory in all
+   *          locations.
+   * @param nameSpaceDir path of the directory in the namespace.
+   * @param nnFs file system where the directory level invocation needs to be
+   *          tested.
+   * @param name name for the Xattr.
+   * @param value value for the Xattr.
+   * @return true, if directory existed and successful verification of
+   *         invocations.
+   * @throws IOException
+   */
+  private boolean verifyDirectoryLevelInvocations(boolean dirAll,
+      Path nameSpaceDir, DistributedFileSystem nnFs, final String name,
+      final byte[] value) throws IOException {
+    boolean checked = false;
+    if (dirAll || nnFs.exists(nameSpaceDir)) {
+      checked = true;
+      assertEquals("testuser", nnFs.getFileStatus(nameSpaceDir).getOwner());
+      assertEquals("COLD", nnFs.getStoragePolicy(nameSpaceDir).getName());
+      assertEquals("RS-6-3-1024k",
+          nnFs.getErasureCodingPolicy(nameSpaceDir).getName());
+      assertArrayEquals(value, nnFs.getXAttr(nameSpaceDir, name));
+      assertEquals((short) 777,
+          nnFs.getFileStatus(nameSpaceDir).getPermission().toShort());
+    }
+    return checked;
+  }
+
+  /**
+   * Add a mount table entry to the mount table through the admin API.
+   * @param entry Mount table entry to add.
+   * @return If it was successfully added.
+   * @throws IOException + * Problems adding entries.
+   */
+  private boolean addMountTable(final MountTable entry) throws IOException {
+    RouterClient client = routerContext.getAdminClient();
+    MountTableManager mountTableManager = client.getMountTableManager();
+    AddMountTableEntryRequest addRequest =
+        AddMountTableEntryRequest.newInstance(entry);
+    AddMountTableEntryResponse addResponse =
+        mountTableManager.addMountTableEntry(addRequest);
+
+    // Reload the Router cache
+    resolver.loadCache(true);
+
+    return addResponse.getStatus();
+  }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org