You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/02/20 22:21:36 UTC
[hadoop] 10/41: HDFS-13776. RBF: Add Storage policies related
ClientProtocol APIs. Contributed by Dibyendu Karmakar.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 53b69da61041afb6807f1a79b2f9fa4bd6901c38
Author: Brahma Reddy Battula <br...@apache.org>
AuthorDate: Thu Nov 22 00:34:08 2018 +0530
HDFS-13776. RBF: Add Storage policies related ClientProtocol APIs. Contributed by Dibyendu Karmakar.
---
.../federation/router/RouterClientProtocol.java | 24 ++--
.../federation/router/RouterStoragePolicy.java | 98 ++++++++++++++
.../server/federation/MiniRouterDFSCluster.java | 13 ++
.../server/federation/router/TestRouterRpc.java | 57 ++++++++
.../TestRouterRpcStoragePolicySatisfier.java | 149 +++++++++++++++++++++
5 files changed, 325 insertions(+), 16 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
index 6c44362..81717ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -121,6 +121,8 @@ public class RouterClientProtocol implements ClientProtocol {
private final String superGroup;
/** Erasure coding calls. */
private final ErasureCoding erasureCoding;
+ /** StoragePolicy calls. **/
+ private final RouterStoragePolicy storagePolicy;
RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
this.rpcServer = rpcServer;
@@ -138,6 +140,7 @@ public class RouterClientProtocol implements ClientProtocol {
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
this.erasureCoding = new ErasureCoding(rpcServer);
+ this.storagePolicy = new RouterStoragePolicy(rpcServer);
}
@Override
@@ -272,22 +275,12 @@ public class RouterClientProtocol implements ClientProtocol {
@Override
public void setStoragePolicy(String src, String policyName)
throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
-
- List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("setStoragePolicy",
- new Class<?>[] {String.class, String.class},
- new RemoteParam(), policyName);
- rpcClient.invokeSequential(locations, method, null, null);
+ storagePolicy.setStoragePolicy(src, policyName);
}
@Override
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ);
-
- RemoteMethod method = new RemoteMethod("getStoragePolicies");
- String ns = subclusterResolver.getDefaultNamespace();
- return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
+ return storagePolicy.getStoragePolicies();
}
@Override
@@ -1457,13 +1450,12 @@ public class RouterClientProtocol implements ClientProtocol {
@Override
public void unsetStoragePolicy(String src) throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ storagePolicy.unsetStoragePolicy(src);
}
@Override
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
- return null;
+ return storagePolicy.getStoragePolicy(path);
}
@Override
@@ -1551,7 +1543,7 @@ public class RouterClientProtocol implements ClientProtocol {
@Override
public void satisfyStoragePolicy(String path) throws IOException {
- rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ storagePolicy.satisfyStoragePolicy(path);
}
@Override
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java
new file mode 100644
index 0000000..7145940
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterStoragePolicy.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * Module that implements all the RPC calls in
+ * {@link org.apache.hadoop.hdfs.protocol.ClientProtocol} related to
+ * Storage Policy in the {@link RouterRpcServer}.
+ */
+public class RouterStoragePolicy {
+
+ /** RPC server to receive client calls. */
+ private final RouterRpcServer rpcServer;
+ /** RPC clients to connect to the Namenodes. */
+ private final RouterRpcClient rpcClient;
+ /** Interface to map global name space to HDFS subcluster name spaces. */
+ private final FileSubclusterResolver subclusterResolver;
+
+ public RouterStoragePolicy(RouterRpcServer server) {
+ this.rpcServer = server;
+ this.rpcClient = this.rpcServer.getRPCClient();
+ this.subclusterResolver = this.rpcServer.getSubclusterResolver();
+ }
+
+ public void setStoragePolicy(String src, String policyName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setStoragePolicy",
+ new Class<?>[] {String.class, String.class},
+ new RemoteParam(),
+ policyName);
+ rpcClient.invokeSequential(locations, method, null, null);
+ }
+
+ public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod("getStoragePolicies");
+ String ns = subclusterResolver.getDefaultNamespace();
+ return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
+ }
+
+ public void unsetStoragePolicy(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, true);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("unsetStoragePolicy",
+ new Class<?>[] {String.class},
+ new RemoteParam());
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ public BlockStoragePolicy getStoragePolicy(String path)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, false);
+ RemoteMethod method = new RemoteMethod("getStoragePolicy",
+ new Class<?>[] {String.class},
+ new RemoteParam());
+ return (BlockStoragePolicy) rpcClient.invokeSequential(locations, method);
+ }
+
+ public void satisfyStoragePolicy(String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, true);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(path, true);
+ RemoteMethod method = new RemoteMethod("satisfyStoragePolicy",
+ new Class<?>[] {String.class},
+ new RemoteParam());
+ rpcClient.invokeSequential(locations, method);
+ }
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
index a5693a6..2df883c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/MiniRouterDFSCluster.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.UnsupportedFileSystemException;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.DFSClient;
@@ -118,6 +119,8 @@ public class MiniRouterDFSCluster {
private boolean highAvailability;
/** Number of datanodes per nameservice. */
private int numDatanodesPerNameservice = 2;
+ /** Custom storage type for each datanode. */
+ private StorageType[][] storageTypes = null;
/** Mini cluster. */
private MiniDFSCluster cluster;
@@ -615,6 +618,15 @@ public class MiniRouterDFSCluster {
}
/**
+ * Set custom storage type configuration for each datanode.
+ * If storageTypes is uninitialized or passed null then
+ * StorageType.DEFAULT is used.
+ */
+ public void setStorageTypes(StorageType[][] storageTypes) {
+ this.storageTypes = storageTypes;
+ }
+
+ /**
* Set the DNs to belong to only one subcluster.
*/
public void setIndependentDNs() {
@@ -767,6 +779,7 @@ public class MiniRouterDFSCluster {
.numDataNodes(numDNs)
.nnTopology(topology)
.dataNodeConfOverlays(dnConfs)
+ .storageTypes(storageTypes)
.build();
cluster.waitActive();
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 204366e..8632203 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -72,6 +72,7 @@ import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyState;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -770,6 +771,62 @@ public class TestRouterRpc {
}
@Test
+ public void testProxyGetAndUnsetStoragePolicy() throws Exception {
+ String file = "/testGetStoragePolicy";
+ String nnFilePath = cluster.getNamenodeTestDirectoryForNS(ns) + file;
+ String routerFilePath = cluster.getFederatedTestDirectoryForNS(ns) + file;
+
+ createFile(routerFS, routerFilePath, 32);
+
+ // Get storage policy via router
+ BlockStoragePolicy policy = routerProtocol.getStoragePolicy(routerFilePath);
+ // Verify default policy is HOT
+ assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
+ assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
+
+ // Get storage policies via router
+ BlockStoragePolicy[] policies = routerProtocol.getStoragePolicies();
+ BlockStoragePolicy[] nnPolicies = namenode.getClient().getStoragePolicies();
+ // Verify policie returned by router is same as policies returned by NN
+ assertArrayEquals(nnPolicies, policies);
+
+ BlockStoragePolicy newPolicy = policies[0];
+ while (newPolicy.isCopyOnCreateFile()) {
+ // Pick a non copy on create policy. Beacuse if copyOnCreateFile is set
+ // then the policy cannot be changed after file creation.
+ Random rand = new Random();
+ int randIndex = rand.nextInt(policies.length);
+ newPolicy = policies[randIndex];
+ }
+ routerProtocol.setStoragePolicy(routerFilePath, newPolicy.getName());
+
+ // Get storage policy via router
+ policy = routerProtocol.getStoragePolicy(routerFilePath);
+ // Verify default policy
+ assertEquals(newPolicy.getName(), policy.getName());
+ assertEquals(newPolicy.getId(), policy.getId());
+
+ // Verify policy via NN
+ BlockStoragePolicy nnPolicy =
+ namenode.getClient().getStoragePolicy(nnFilePath);
+ assertEquals(nnPolicy.getName(), policy.getName());
+ assertEquals(nnPolicy.getId(), policy.getId());
+
+ // Unset storage policy via router
+ routerProtocol.unsetStoragePolicy(routerFilePath);
+
+ // Get storage policy
+ policy = routerProtocol.getStoragePolicy(routerFilePath);
+ assertEquals(HdfsConstants.HOT_STORAGE_POLICY_NAME, policy.getName());
+ assertEquals(HdfsConstants.HOT_STORAGE_POLICY_ID, policy.getId());
+
+ // Verify policy via NN
+ nnPolicy = namenode.getClient().getStoragePolicy(nnFilePath);
+ assertEquals(nnPolicy.getName(), policy.getName());
+ assertEquals(nnPolicy.getId(), policy.getId());
+ }
+
+ @Test
public void testProxyGetPreferedBlockSize() throws Exception {
// Query via NN and Router and verify
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java
new file mode 100644
index 0000000..fa1079a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpcStoragePolicySatisfier.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.apache.hadoop.hdfs.server.federation.MiniRouterDFSCluster;
+import org.apache.hadoop.hdfs.server.federation.RouterConfigBuilder;
+import org.apache.hadoop.hdfs.server.federation.metrics.NamenodeBeanMetrics;
+import org.apache.hadoop.hdfs.server.namenode.sps.Context;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
+import org.apache.hadoop.hdfs.server.sps.ExternalSPSContext;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Test StoragePolicySatisfy through router rpc calls.
+ */
+public class TestRouterRpcStoragePolicySatisfier {
+
+ /** Federated HDFS cluster. */
+ private static MiniRouterDFSCluster cluster;
+
+ /** Client interface to the Router. */
+ private static ClientProtocol routerProtocol;
+
+ /** Filesystem interface to the Router. */
+ private static FileSystem routerFS;
+ /** Filesystem interface to the Namenode. */
+ private static FileSystem nnFS;
+
+ @BeforeClass
+ public static void globalSetUp() throws Exception {
+ cluster = new MiniRouterDFSCluster(false, 1);
+ // Set storage types for the cluster
+ StorageType[][] newtypes = new StorageType[][] {
+ {StorageType.ARCHIVE, StorageType.DISK}};
+ cluster.setStorageTypes(newtypes);
+
+ Configuration conf = cluster.getNamenodes().get(0).getConf();
+ conf.set(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+ HdfsConstants.StoragePolicySatisfierMode.EXTERNAL.toString());
+ // Reduced refresh cycle to update latest datanodes.
+ conf.setLong(DFSConfigKeys.DFS_SPS_DATANODE_CACHE_REFRESH_INTERVAL_MS,
+ 1000);
+ cluster.addNamenodeOverrides(conf);
+
+ cluster.setNumDatanodesPerNameservice(1);
+
+ // Start NNs and DNs and wait until ready
+ cluster.startCluster();
+
+ // Start routers with only an RPC service
+ Configuration routerConf = new RouterConfigBuilder()
+ .metrics()
+ .rpc()
+ .build();
+ // We decrease the DN cache times to make the test faster
+ routerConf.setTimeDuration(
+ RBFConfigKeys.DN_REPORT_CACHE_EXPIRE, 1, TimeUnit.SECONDS);
+ cluster.addRouterOverrides(routerConf);
+ cluster.startRouters();
+
+ // Register and verify all NNs with all routers
+ cluster.registerNamenodes();
+ cluster.waitNamenodeRegistration();
+
+ // Create mock locations
+ cluster.installMockLocations();
+
+ // Random router for this test
+ MiniRouterDFSCluster.RouterContext rndRouter = cluster.getRandomRouter();
+
+ routerProtocol = rndRouter.getClient().getNamenode();
+ routerFS = rndRouter.getFileSystem();
+ nnFS = cluster.getNamenodes().get(0).getFileSystem();
+
+ NameNodeConnector nnc = DFSTestUtil.getNameNodeConnector(conf,
+ HdfsServerConstants.MOVER_ID_PATH, 1, false);
+
+ StoragePolicySatisfier externalSps = new StoragePolicySatisfier(conf);
+ Context externalCtxt = new ExternalSPSContext(externalSps, nnc);
+
+ externalSps.init(externalCtxt);
+ externalSps.start(HdfsConstants.StoragePolicySatisfierMode.EXTERNAL);
+ }
+
+ @AfterClass
+ public static void tearDown() {
+ cluster.shutdown();
+ }
+
+ @Test
+ public void testStoragePolicySatisfier() throws Exception {
+ final String file = "/testStoragePolicySatisfierCommand";
+ short repl = 1;
+ int size = 32;
+ DFSTestUtil.createFile(routerFS, new Path(file), size, repl, 0);
+ // Varify storage type is DISK
+ DFSTestUtil.waitExpectedStorageType(file, StorageType.DISK, 1, 20000,
+ (DistributedFileSystem) routerFS);
+ // Set storage policy as COLD
+ routerProtocol
+ .setStoragePolicy(file, HdfsConstants.COLD_STORAGE_POLICY_NAME);
+ // Verify storage policy is set properly
+ BlockStoragePolicy storagePolicy = routerProtocol.getStoragePolicy(file);
+ assertEquals(HdfsConstants.COLD_STORAGE_POLICY_NAME,
+ storagePolicy.getName());
+ // Invoke satisfy storage policy
+ routerProtocol.satisfyStoragePolicy(file);
+ // Verify storage type is ARCHIVE
+ DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000,
+ (DistributedFileSystem) routerFS);
+
+ // Verify storage type via NN
+ DFSTestUtil.waitExpectedStorageType(file, StorageType.ARCHIVE, 1, 20000,
+ (DistributedFileSystem) nnFS);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org