You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/02/15 18:47:56 UTC
[hadoop] branch HDFS-13891 updated: HDFS-14268. RBF: Fix the
location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri.
This is an automated email from the ASF dual-hosted git repository.
gifuma pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/HDFS-13891 by this push:
new f94b6e3 HDFS-14268. RBF: Fix the location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri.
f94b6e3 is described below
commit f94b6e339f7dbc957ee6ad5ba814af137ded2ff0
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Fri Feb 15 10:47:17 2019 -0800
HDFS-14268. RBF: Fix the location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri.
---
.../hadoop/hdfs/protocol/ECBlockGroupStats.java | 71 ++++++++++++++++++++++
.../server/federation/router/ErasureCoding.java | 29 +--------
.../server/federation/router/RouterRpcClient.java | 19 ++----
.../server/federation/router/TestRouterRpc.java | 48 +++++++++++----
4 files changed, 114 insertions(+), 53 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java
index 3dde604..1ead5c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java
@@ -17,6 +17,10 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.util.Collection;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -103,4 +107,71 @@ public final class ECBlockGroupStats {
statsBuilder.append("]");
return statsBuilder.toString();
}
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(lowRedundancyBlockGroups)
+ .append(corruptBlockGroups)
+ .append(missingBlockGroups)
+ .append(bytesInFutureBlockGroups)
+ .append(pendingDeletionBlocks)
+ .append(highestPriorityLowRedundancyBlocks)
+ .toHashCode();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ECBlockGroupStats other = (ECBlockGroupStats)o;
+ return new EqualsBuilder()
+ .append(lowRedundancyBlockGroups, other.lowRedundancyBlockGroups)
+ .append(corruptBlockGroups, other.corruptBlockGroups)
+ .append(missingBlockGroups, other.missingBlockGroups)
+ .append(bytesInFutureBlockGroups, other.bytesInFutureBlockGroups)
+ .append(pendingDeletionBlocks, other.pendingDeletionBlocks)
+ .append(highestPriorityLowRedundancyBlocks,
+ other.highestPriorityLowRedundancyBlocks)
+ .isEquals();
+ }
+
+ /**
+ * Merge the multiple ECBlockGroupStats.
+ * @param stats Collection of stats to merge.
+ * @return A new ECBlockGroupStats merging all the input ones
+ */
+ public static ECBlockGroupStats merge(Collection<ECBlockGroupStats> stats) {
+ long lowRedundancyBlockGroups = 0;
+ long corruptBlockGroups = 0;
+ long missingBlockGroups = 0;
+ long bytesInFutureBlockGroups = 0;
+ long pendingDeletionBlocks = 0;
+ long highestPriorityLowRedundancyBlocks = 0;
+ boolean hasHighestPriorityLowRedundancyBlocks = false;
+
+ for (ECBlockGroupStats stat : stats) {
+ lowRedundancyBlockGroups += stat.getLowRedundancyBlockGroups();
+ corruptBlockGroups += stat.getCorruptBlockGroups();
+ missingBlockGroups += stat.getMissingBlockGroups();
+ bytesInFutureBlockGroups += stat.getBytesInFutureBlockGroups();
+ pendingDeletionBlocks += stat.getPendingDeletionBlocks();
+ if (stat.hasHighestPriorityLowRedundancyBlocks()) {
+ hasHighestPriorityLowRedundancyBlocks = true;
+ highestPriorityLowRedundancyBlocks +=
+ stat.getHighestPriorityLowRedundancyBlocks();
+ }
+ }
+ if (hasHighestPriorityLowRedundancyBlocks) {
+ return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
+ missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks,
+ highestPriorityLowRedundancyBlocks);
+ }
+ return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
+ missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
index f4584b1..97c5f6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
@@ -187,33 +187,6 @@ public class ErasureCoding {
rpcClient.invokeConcurrent(
nss, method, true, false, ECBlockGroupStats.class);
- // Merge the stats from all the namespaces
- long lowRedundancyBlockGroups = 0;
- long corruptBlockGroups = 0;
- long missingBlockGroups = 0;
- long bytesInFutureBlockGroups = 0;
- long pendingDeletionBlocks = 0;
- long highestPriorityLowRedundancyBlocks = 0;
- boolean hasHighestPriorityLowRedundancyBlocks = false;
-
- for (ECBlockGroupStats stats : allStats.values()) {
- lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups();
- corruptBlockGroups += stats.getCorruptBlockGroups();
- missingBlockGroups += stats.getMissingBlockGroups();
- bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups();
- pendingDeletionBlocks += stats.getPendingDeletionBlocks();
- if (stats.hasHighestPriorityLowRedundancyBlocks()) {
- hasHighestPriorityLowRedundancyBlocks = true;
- highestPriorityLowRedundancyBlocks +=
- stats.getHighestPriorityLowRedundancyBlocks();
- }
- }
- if (hasHighestPriorityLowRedundancyBlocks) {
- return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
- missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks,
- highestPriorityLowRedundancyBlocks);
- }
- return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
- missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks);
+ return ECBlockGroupStats.merge(allStats.values());
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index d21bde3..3d80c41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -24,16 +24,15 @@ import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.InetSocketAddress;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
-import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
@@ -1061,8 +1060,8 @@ public class RouterRpcClient {
}
}
- List<T> orderedLocations = new LinkedList<>();
- Set<Callable<Object>> callables = new HashSet<>();
+ List<T> orderedLocations = new ArrayList<>();
+ List<Callable<Object>> callables = new ArrayList<>();
for (final T location : locations) {
String nsId = location.getNameserviceId();
final List<? extends FederationNamenodeContext> namenodes =
@@ -1080,20 +1079,12 @@ public class RouterRpcClient {
nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
}
orderedLocations.add(nnLocation);
- callables.add(new Callable<Object>() {
- public Object call() throws Exception {
- return invokeMethod(ugi, nnList, proto, m, paramList);
- }
- });
+ callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList));
}
} else {
// Call the objectGetter in order of nameservices in the NS list
orderedLocations.add(location);
- callables.add(new Callable<Object>() {
- public Object call() throws Exception {
- return invokeMethod(ugi, namenodes, proto, m, paramList);
- }
- });
+ callables.add(() -> invokeMethod(ugi, namenodes, proto, m, paramList));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 7af156f..95c04d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.lang.reflect.Method;
import java.net.URISyntaxException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
import java.util.EnumSet;
@@ -47,6 +48,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Random;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
@@ -120,6 +122,11 @@ public class TestRouterRpc {
private static final Logger LOG =
LoggerFactory.getLogger(TestRouterRpc.class);
+ private static final int NUM_SUBCLUSTERS = 2;
+ // We need at least 6 DNs to test Erasure Coding with RS-6-3-64k
+ private static final int NUM_DNS = 6;
+
+
private static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP =
new Comparator<ErasureCodingPolicyInfo>() {
public int compare(
@@ -165,9 +172,9 @@ public class TestRouterRpc {
@BeforeClass
public static void globalSetUp() throws Exception {
- cluster = new MiniRouterDFSCluster(false, 2);
- // We need 6 DNs to test Erasure Coding with RS-6-3-64k
- cluster.setNumDatanodesPerNameservice(6);
+ cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
+ cluster.setNumDatanodesPerNameservice(NUM_DNS);
+ cluster.setIndependentDNs();
// Start NNs and DNs and wait until ready
cluster.startCluster();
@@ -586,8 +593,13 @@ public class TestRouterRpc {
DatanodeInfo[] combinedData =
routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
+ final Map<Integer, String> routerDNMap = new TreeMap<>();
+ for (DatanodeInfo dn : combinedData) {
+ String subcluster = dn.getNetworkLocation().split("/")[1];
+ routerDNMap.put(dn.getXferPort(), subcluster);
+ }
- Set<Integer> individualData = new HashSet<Integer>();
+ final Map<Integer, String> nnDNMap = new TreeMap<>();
for (String nameservice : cluster.getNameservices()) {
NamenodeContext n = cluster.getNamenode(nameservice, null);
DFSClient client = n.getClient();
@@ -597,10 +609,10 @@ public class TestRouterRpc {
for (int i = 0; i < data.length; i++) {
// Collect unique DNs based on their xfer port
DatanodeInfo info = data[i];
- individualData.add(info.getXferPort());
+ nnDNMap.put(info.getXferPort(), nameservice);
}
}
- assertEquals(combinedData.length, individualData.size());
+ assertEquals(nnDNMap, routerDNMap);
}
@Test
@@ -1234,7 +1246,7 @@ public class TestRouterRpc {
}
@Test
- public void testErasureCoding() throws IOException {
+ public void testErasureCoding() throws Exception {
LOG.info("List the available erasurce coding policies");
ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies();
@@ -1340,8 +1352,22 @@ public class TestRouterRpc {
LOG.info("Check the stats");
ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats();
- ECBlockGroupStats statsNamenode = nnProtocol.getECBlockGroupStats();
- assertEquals(statsNamenode.toString(), statsRouter.toString());
+ ECBlockGroupStats statsNamenode = getNamenodeECBlockGroupStats();
+ assertEquals(statsNamenode, statsRouter);
+ }
+
+ /**
+ * Get the EC stats from all namenodes and aggregate them.
+ * @return Aggregated EC stats from all namenodes.
+ * @throws Exception If we cannot get the stats.
+ */
+ private ECBlockGroupStats getNamenodeECBlockGroupStats() throws Exception {
+ List<ECBlockGroupStats> nnStats = new ArrayList<>();
+ for (NamenodeContext nnContext : cluster.getNamenodes()) {
+ ClientProtocol cp = nnContext.getClient().getNamenode();
+ nnStats.add(cp.getECBlockGroupStats());
+ }
+ return ECBlockGroupStats.merge(nnStats);
}
@Test
@@ -1375,9 +1401,9 @@ public class TestRouterRpc {
router.getRouter().getNamenodeMetrics();
final String jsonString0 = metrics.getLiveNodes();
- // We should have 12 nodes in total
+ // We should have the nodes in all the subclusters
JSONObject jsonObject = new JSONObject(jsonString0);
- assertEquals(12, jsonObject.names().length());
+ assertEquals(NUM_SUBCLUSTERS * NUM_DNS, jsonObject.names().length());
// We should be caching this information
String jsonString1 = metrics.getLiveNodes();
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org