You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/02/20 22:21:36 UTC

[hadoop] 10/41: HDFS-13776. RBF: Add Storage policies related ClientProtocol APIs. Contributed by Dibyendu Karmakar.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit 53b69da61041afb6807f1a79b2f9fa4bd6901c38
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Thu Nov 22 00:34:08 2018 +0530

    HDFS-13776. RBF: Add Storage policies related ClientProtocol APIs. Contributed by Dibyendu Karmakar.
---
 .../federation/router/RouterClientProtocol.java    |  24 ++--
 .../federation/router/RouterStoragePolicy.java     |  98 ++++++++++++++
 .../server/federation/MiniRouterDFSCluster.java    |  13 ++
 .../server/federation/router/TestRouterRpc.java    |  57 ++++++++
 .../TestRouterRpcStoragePolicySatisfier.java       | 149 +++++++++++++++++++++
 5 files changed, 325 insertions(+), 16 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 6c44362..81717ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -121,6 +121,8 @@ public class RouterClientProtocol implements ClientProtocol {
   private final String superGroup;
   /** Erasure coding calls. */
   private final ErasureCoding erasureCoding;
+  /** StoragePolicy calls. **/
+  private final RouterStoragePolicy storagePolicy;
 
   RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
     this.rpcServer = rpcServer;
@@ -138,6 +140,7 @@ public class RouterClientProtocol implements ClientProtocol {
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
         DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
     this.erasureCoding = new ErasureCoding(rpcServer);
+    this.storagePolicy = new RouterStoragePolicy(rpcServer);
   }
 
   @Override
@@ -272,22 +275,12 @@ public class RouterClientProtocol implements ClientProtocol {
   @Override
   public void setStoragePolicy(String src, String policyName)
       throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
-    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
-    RemoteMethod method = new RemoteMethod("setStoragePolicy",
-        new Class<?>[] {String.class, String.class},
-        new RemoteParam(), policyName);
-    rpcClient.invokeSequential(locations, method, null, null);
+    storagePolicy.setStoragePolicy(src, policyName);
   }
 
   @Override
   public BlockStoragePolicy[] getStoragePolicies() throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
-    RemoteMethod method = new RemoteMethod("getStoragePolicies");
-    String ns = subclusterResolver.getDefaultNamespace();
-    return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
+    return storagePolicy.getStoragePolicies();
   }
 
   @Override
@@ -1457,13 +1450,12 @@ public class RouterClientProtocol implements ClientProtocol {
 
   @Override
   public void unsetStoragePolicy(String src) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    storagePolicy.unsetStoragePolicy(src);
   }
 
   @Override
   public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
-    return null;
+    return storagePolicy.getStoragePolicy(path);
   }
 
   @Override
@@ -1551,7 +1543,7 @@ public class RouterClientProtocol implements ClientProtocol {
 
   @Override
   public void satisfyStoragePolicy(String path) throws IOException {
-    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+    storagePolicy.satisfyStoragePolicy(path);
   }
 
   @Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java
new file mode 100644
index 0000000..7145940
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Module that implements all the RPC calls in
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to
+ * Storage Policy in the {@link RouterRpcServer}.
+ */
+public class RouterStoragePolicy {
+
+  /** RPC server to receive client calls. */
+  private final RouterRpcServer rpcServer;
+  /** RPC clients to connect to the Namenodes. */
+  private final RouterRpcClient rpcClient;
+  /** Interface to map global name space to HDFS subcluster name spaces. */
+  private final FileSubclusterResolver subclusterResolver;
+
+  public RouterStoragePolicy(RouterRpcServer server) {
+    this.rpcServer = server;
+    this.rpcClient = this.rpcServer.getRPCClient();
+    this.subclusterResolver = this.rpcServer.getSubclusterResolver();
+  }
+
+  public void setStoragePolicy(String src, String policyName)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("setStoragePolicy",
+        new Class<?>[] {String.class, String.class},
+        new RemoteParam(),
+        policyName);
+    rpcClient.invokeSequential(locations, method, null, null);
+  }
+
+  public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+    RemoteMethod method = new RemoteMethod("getStoragePolicies");
+    String ns = subclusterResolver.getDefaultNamespace();
+    return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
+  }
+
+  public void unsetStoragePolicy(String src) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+    RemoteMethod method = new RemoteMethod("unsetStoragePolicy",
+        new Class<?>[] {String.class},
+        new RemoteParam());
+    rpcClient.invokeSequential(locations, method);
+  }
+
+  public BlockStoragePolicy getStoragePolicy(String path)
+      throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false);
+    RemoteMethod method = new RemoteMethod("getStoragePolicy",
+        new Class<?>[] {String.class},
+        new RemoteParam());
+    return (BlockStoragePolicy) rpcClient.invokeSequential(locations, method);
+  }
+
+  public void satisfyStoragePolicy(String path) throws IOException {
+    rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+
+    List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, true);
+    RemoteMethod method = new RemoteMethod("satisfyStoragePolicy",
+        new Class<?>[] {String.class},
+        new RemoteParam());
+    rpcClient.invokeSequential(locations, method);
+  }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
index a5693a6..2df883c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
 import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
 import org.apache.hadoop.hdfs.DFSClient;
@@ -118,6 +119,8 @@ public class MiniRouterDFSCluster {
   private boolean highAvailability;
   /** Number of datanodes per nameservice. */
   private int numDatanodesPerNameservice = 2;
+  /** Custom storage type for each datanode. */
+  private StorageType[][] storageTypes = null;
 
   /** Mini cluster. */
   private MiniDFSCluster cluster;
@@ -615,6 +618,15 @@ public class MiniRouterDFSCluster {
   }
 
   /**
+   * Set custom storage type configuration for each datanode.
+   * If storageTypes is uninitialized or passed null then
+   * StorageType.DEFAULT is used.
+   */
+  public void setStorageTypes(StorageType[][] storageTypes) {
+    this.storageTypes = storageTypes;
+  }
+
+  /**
    * Set the DNs to belong to only one subcluster.
    */
   public void setIndependentDNs() {
@@ -767,6 +779,7 @@ public class MiniRouterDFSCluster {
           .numDataNodes(numDNs)
           .nnTopology(topology)
           .dataNodeConfOverlays(dnConfs)
+          .storageTypes(storageTypes)
           .build();
       cluster.waitActive();
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 204366e..8632203 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -770,6 +771,62 @@ public class TestRouterRpc {
   }
 
   @Test
+  public void testProxyGetAndUnsetStoragePolicy() throws Exception {
+    String file = "/testGetStoragePolicy";
+    String nnFilePath = cluster.getNamenodeTestDirectoryForNS(ns) + file;
+    String routerFilePath = cluster.getFederatedTestDirectoryForNS(ns) + file;
+
+    createFile(routerFS, routerFilePath, 32);
+
+    // Get storage policy via router
+    BlockStoragePolicy policy = routerProtocol.getStoragePolicy(routerFilePath);
+    // Verify default policy is HOT
+    assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
+    assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
+
+    // Get storage policies via router
+    BlockStoragePolicy[] policies = routerProtocol.getStoragePolicies();
+    BlockStoragePolicy[] nnPolicies = namenode.getClient().getStoragePolicies();
+    // Verify policie returned by router is same as policies returned by NN
+    assertArrayEquals(nnPolicies, policies);
+
+    BlockStoragePolicy newPolicy = policies[0];
+    while (newPolicy.isCopyOnCreateFile()) {
+      // Pick a non copy on create policy. Beacuse if copyOnCreateFile is set
+      // then the policy cannot be changed after file creation.
+      Random rand = new Random();
+      int randIndex = rand.nextInt(policies.length);
+      newPolicy = policies[randIndex];
+    }
+    routerProtocol.setStoragePolicy(routerFilePath, newPolicy.getName());
+
+    // Get storage policy via router
+    policy = routerProtocol.getStoragePolicy(routerFilePath);
+    // Verify default policy
+    assertEquals(newPolicy.getName(), policy.getName());
+    assertEquals(newPolicy.getId(), policy.getId());
+
+    // Verify policy via NN
+    BlockStoragePolicy nnPolicy =
+        namenode.getClient().getStoragePolicy(nnFilePath);
+    assertEquals(nnPolicy.getName(), policy.getName());
+    assertEquals(nnPolicy.getId(), policy.getId());
+
+    // Unset storage policy via router
+    routerProtocol.unsetStoragePolicy(routerFilePath);
+
+    // Get storage policy
+    policy = routerProtocol.getStoragePolicy(routerFilePath);
+    assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
+    assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
+
+    // Verify policy via NN
+    nnPolicy = namenode.getClient().getStoragePolicy(nnFilePath);
+    assertEquals(nnPolicy.getName(), policy.getName());
+    assertEquals(nnPolicy.getId(), policy.getId());
+  }
+
+  @Test
   public void testProxyGetPreferedBlockSize() throws Exception {
 
     // Query via NN and Router and verify
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java
new file mode 100644
index 0000000..fa1079a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test StoragePolicySatisfy through router rpc calls.
+ */
+public class TestRouterRpcStoragePolicySatisfier {
+
+  /** Federated HDFS cluster. */
+  private static MiniRouterDFSCluster cluster;
+
+  /** Client interface to the Router. */
+  private static ClientProtocol routerProtocol;
+
+  /** Filesystem interface to the Router. */
+  private static FileSystem routerFS;
+  /** Filesystem interface to the Namenode. */
+  private static FileSystem nnFS;
+
+  @BeforeClass
+  public static void globalSetUp() throws Exception {
+    cluster = new MiniRouterDFSCluster(false, 1);
+    // Set storage types for the cluster
+    StorageType[][] newtypes = new StorageType[][] {
+        {StorageType.ARCHIVE, StorageType.DISK}};
+    cluster.setStorageTypes(newtypes);
+
+    Configuration conf = cluster.getNamenodes().get(0).getConf();
+    conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+        HdfsConstants.StoragePolicySatisfierMode.EXTERNAL.toString());
+    // Reduced refresh cycle to update latest datanodes.
+    conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
+        1000);
+    cluster.addNamenodeOverrides(conf);
+
+    cluster.setNumDatanodesPerNameservice(1);
+
+    // Start NNs and DNs and wait until ready
+    cluster.startCluster();
+
+    // Start routers with only an RPC service
+    Configuration routerConf = new RouterConfigBuilder()
+        .metrics()
+        .rpc()
+        .build();
+    // We decrease the DN cache times to make the test faster
+    routerConf.setTimeDuration(
+        RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+    cluster.addRouterOverrides(routerConf);
+    cluster.startRouters();
+
+    // Register and verify all NNs with all routers
+    cluster.registerNamenodes();
+    cluster.waitNamenodeRegistration();
+
+    // Create mock locations
+    cluster.installMockLocations();
+
+    // Random router for this test
+    MiniRouterDFSCluster.RouterContext rndRouter = cluster.getRandomRouter();
+
+    routerProtocol = rndRouter.getClient().getNamenode();
+    routerFS = rndRouter.getFileSystem();
+    nnFS = cluster.getNamenodes().get(0).getFileSystem();
+
+    NameNodeConnector nnc = DFSTestUtil.getNameNodeConnector(conf,
+        HdfsServerConstants.MOVER_ID_PATH, 1, false);
+
+    StoragePolicySatisfier externalSps = new StoragePolicySatisfier(conf);
+    Context externalCtxt = new ExternalSPSContext(externalSps, nnc);
+
+    externalSps.init(externalCtxt);
+    externalSps.start(HdfsConstants.StoragePolicySatisfierMode.EXTERNAL);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    cluster.shutdown();
+  }
+
+  @Test
+  public void testStoragePolicySatisfier() throws Exception {
+    final String file = "/testStoragePolicySatisfierCommand";
+    short repl = 1;
+    int size = 32;
+    DFSTestUtil.createFile(routerFS, new Path(file), size, repl, 0);
+    // Varify storage type is DISK
+    DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 1, 20000,
+        (DistributedFileSystem) routerFS);
+    // Set storage policy as COLD
+    routerProtocol
+        .setStoragePolicy(file, HdfsConstants.COLD_STORAGE_POLICY_NAME);
+    // Verify storage policy is set properly
+    BlockStoragePolicy storagePolicy = routerProtocol.getStoragePolicy(file);
+    assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME,
+        storagePolicy.getName());
+    // Invoke satisfy storage policy
+    routerProtocol.satisfyStoragePolicy(file);
+    // Verify storage type is ARCHIVE
+    DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000,
+        (DistributedFileSystem) routerFS);
+
+    // Verify storage type via NN
+    DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000,
+        (DistributedFileSystem) nnFS);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org