You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by gi...@apache.org on 2019/02/15 18:47:56 UTC

[hadoop] branch HDFS-13891 updated: HDFS-14268. RBF: Fix the location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri.

This is an automated email from the ASF dual-hosted git repository.

gifuma pushed a commit to branch HDFS-13891
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/HDFS-13891 by this push:
     new f94b6e3  HDFS-14268. RBF: Fix the location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri.
f94b6e3 is described below

commit f94b6e339f7dbc957ee6ad5ba814af137ded2ff0
Author: Giovanni Matteo Fumarola <gi...@apache.org>
AuthorDate: Fri Feb 15 10:47:17 2019 -0800

    HDFS-14268. RBF: Fix the location of the DNs in getDatanodeReport(). Contributed by Inigo Goiri.
---
 .../hadoop/hdfs/protocol/ECBlockGroupStats.java    | 71 ++++++++++++++++++++++
 .../server/federation/router/ErasureCoding.java    | 29 +--------
 .../server/federation/router/RouterRpcClient.java  | 19 ++----
 .../server/federation/router/TestRouterRpc.java    | 48 +++++++++++----
 4 files changed, 114 insertions(+), 53 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java
index 3dde604..1ead5c1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ECBlockGroupStats.java
@@ -17,6 +17,10 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.util.Collection;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 
@@ -103,4 +107,71 @@ public final class ECBlockGroupStats {
     statsBuilder.append("]");
     return statsBuilder.toString();
   }
+
+  @Override
+  public int hashCode() {
+    return new HashCodeBuilder()
+        .append(lowRedundancyBlockGroups)
+        .append(corruptBlockGroups)
+        .append(missingBlockGroups)
+        .append(bytesInFutureBlockGroups)
+        .append(pendingDeletionBlocks)
+        .append(highestPriorityLowRedundancyBlocks)
+        .toHashCode();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ECBlockGroupStats other = (ECBlockGroupStats)o;
+    return new EqualsBuilder()
+        .append(lowRedundancyBlockGroups, other.lowRedundancyBlockGroups)
+        .append(corruptBlockGroups, other.corruptBlockGroups)
+        .append(missingBlockGroups, other.missingBlockGroups)
+        .append(bytesInFutureBlockGroups, other.bytesInFutureBlockGroups)
+        .append(pendingDeletionBlocks, other.pendingDeletionBlocks)
+        .append(highestPriorityLowRedundancyBlocks,
+            other.highestPriorityLowRedundancyBlocks)
+        .isEquals();
+  }
+
+  /**
+   * Merge the multiple ECBlockGroupStats.
+   * @param stats Collection of stats to merge.
+   * @return A new ECBlockGroupStats merging all the input ones
+   */
+  public static ECBlockGroupStats merge(Collection<ECBlockGroupStats> stats) {
+    long lowRedundancyBlockGroups = 0;
+    long corruptBlockGroups = 0;
+    long missingBlockGroups = 0;
+    long bytesInFutureBlockGroups = 0;
+    long pendingDeletionBlocks = 0;
+    long highestPriorityLowRedundancyBlocks = 0;
+    boolean hasHighestPriorityLowRedundancyBlocks = false;
+
+    for (ECBlockGroupStats stat : stats) {
+      lowRedundancyBlockGroups += stat.getLowRedundancyBlockGroups();
+      corruptBlockGroups += stat.getCorruptBlockGroups();
+      missingBlockGroups += stat.getMissingBlockGroups();
+      bytesInFutureBlockGroups += stat.getBytesInFutureBlockGroups();
+      pendingDeletionBlocks += stat.getPendingDeletionBlocks();
+      if (stat.hasHighestPriorityLowRedundancyBlocks()) {
+        hasHighestPriorityLowRedundancyBlocks = true;
+        highestPriorityLowRedundancyBlocks +=
+            stat.getHighestPriorityLowRedundancyBlocks();
+      }
+    }
+    if (hasHighestPriorityLowRedundancyBlocks) {
+      return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
+          missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks,
+          highestPriorityLowRedundancyBlocks);
+    }
+    return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
+        missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks);
+  }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
index f4584b1..97c5f6a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/ErasureCoding.java
@@ -187,33 +187,6 @@ public class ErasureCoding {
         rpcClient.invokeConcurrent(
             nss, method, true, false, ECBlockGroupStats.class);
 
-    // Merge the stats from all the namespaces
-    long lowRedundancyBlockGroups = 0;
-    long corruptBlockGroups = 0;
-    long missingBlockGroups = 0;
-    long bytesInFutureBlockGroups = 0;
-    long pendingDeletionBlocks = 0;
-    long highestPriorityLowRedundancyBlocks = 0;
-    boolean hasHighestPriorityLowRedundancyBlocks = false;
-
-    for (ECBlockGroupStats stats : allStats.values()) {
-      lowRedundancyBlockGroups += stats.getLowRedundancyBlockGroups();
-      corruptBlockGroups += stats.getCorruptBlockGroups();
-      missingBlockGroups += stats.getMissingBlockGroups();
-      bytesInFutureBlockGroups += stats.getBytesInFutureBlockGroups();
-      pendingDeletionBlocks += stats.getPendingDeletionBlocks();
-      if (stats.hasHighestPriorityLowRedundancyBlocks()) {
-        hasHighestPriorityLowRedundancyBlocks = true;
-        highestPriorityLowRedundancyBlocks +=
-            stats.getHighestPriorityLowRedundancyBlocks();
-      }
-    }
-    if (hasHighestPriorityLowRedundancyBlocks) {
-      return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
-          missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks,
-          highestPriorityLowRedundancyBlocks);
-    }
-    return new ECBlockGroupStats(lowRedundancyBlockGroups, corruptBlockGroups,
-        missingBlockGroups, bytesInFutureBlockGroups, pendingDeletionBlocks);
+    return ECBlockGroupStats.merge(allStats.values());
   }
 }
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
index d21bde3..3d80c41 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcClient.java
@@ -24,16 +24,15 @@ import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
 import java.net.InetSocketAddress;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
-import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
@@ -1061,8 +1060,8 @@ public class RouterRpcClient {
       }
     }
 
-    List<T> orderedLocations = new LinkedList<>();
-    Set<Callable<Object>> callables = new HashSet<>();
+    List<T> orderedLocations = new ArrayList<>();
+    List<Callable<Object>> callables = new ArrayList<>();
     for (final T location : locations) {
       String nsId = location.getNameserviceId();
       final List<? extends FederationNamenodeContext> namenodes =
@@ -1080,20 +1079,12 @@ public class RouterRpcClient {
             nnLocation = (T)new RemoteLocation(nsId, nnId, location.getDest());
           }
           orderedLocations.add(nnLocation);
-          callables.add(new Callable<Object>() {
-            public Object call() throws Exception {
-              return invokeMethod(ugi, nnList, proto, m, paramList);
-            }
-          });
+          callables.add(() -> invokeMethod(ugi, nnList, proto, m, paramList));
         }
       } else {
         // Call the objectGetter in order of nameservices in the NS list
         orderedLocations.add(location);
-        callables.add(new Callable<Object>() {
-          public Object call() throws Exception {
-            return invokeMethod(ugi, namenodes, proto, m, paramList);
-          }
-        });
+        callables.add(() ->  invokeMethod(ugi, namenodes, proto, m, paramList));
       }
     }
 
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
index 7af156f..95c04d6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/test/java/org/apache/hadoop/hdfs/server/federation/router/TestRouterRpc.java
@@ -37,6 +37,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.URISyntaxException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
 import java.util.EnumSet;
@@ -47,6 +48,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Random;
 import java.util.Set;
+import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
@@ -120,6 +122,11 @@ public class TestRouterRpc {
   private static final Logger LOG =
       LoggerFactory.getLogger(TestRouterRpc.class);
 
+  private static final int NUM_SUBCLUSTERS = 2;
+  // We need at least 6 DNs to test Erasure Coding with RS-6-3-64k
+  private static final int NUM_DNS = 6;
+
+
   private static final Comparator<ErasureCodingPolicyInfo> EC_POLICY_CMP =
       new Comparator<ErasureCodingPolicyInfo>() {
         public int compare(
@@ -165,9 +172,9 @@ public class TestRouterRpc {
 
   @BeforeClass
   public static void globalSetUp() throws Exception {
-    cluster = new MiniRouterDFSCluster(false, 2);
-    // We need 6 DNs to test Erasure Coding with RS-6-3-64k
-    cluster.setNumDatanodesPerNameservice(6);
+    cluster = new MiniRouterDFSCluster(false, NUM_SUBCLUSTERS);
+    cluster.setNumDatanodesPerNameservice(NUM_DNS);
+    cluster.setIndependentDNs();
 
     // Start NNs and DNs and wait until ready
     cluster.startCluster();
@@ -586,8 +593,13 @@ public class TestRouterRpc {
 
     DatanodeInfo[] combinedData =
         routerProtocol.getDatanodeReport(DatanodeReportType.ALL);
+    final Map<Integer, String> routerDNMap = new TreeMap<>();
+    for (DatanodeInfo dn : combinedData) {
+      String subcluster = dn.getNetworkLocation().split("/")[1];
+      routerDNMap.put(dn.getXferPort(), subcluster);
+    }
 
-    Set<Integer> individualData = new HashSet<Integer>();
+    final Map<Integer, String> nnDNMap = new TreeMap<>();
     for (String nameservice : cluster.getNameservices()) {
       NamenodeContext n = cluster.getNamenode(nameservice, null);
       DFSClient client = n.getClient();
@@ -597,10 +609,10 @@ public class TestRouterRpc {
       for (int i = 0; i < data.length; i++) {
         // Collect unique DNs based on their xfer port
         DatanodeInfo info = data[i];
-        individualData.add(info.getXferPort());
+        nnDNMap.put(info.getXferPort(), nameservice);
       }
     }
-    assertEquals(combinedData.length, individualData.size());
+    assertEquals(nnDNMap, routerDNMap);
   }
 
   @Test
@@ -1234,7 +1246,7 @@ public class TestRouterRpc {
   }
 
   @Test
-  public void testErasureCoding() throws IOException {
+  public void testErasureCoding() throws Exception {
 
     LOG.info("List the available erasurce coding policies");
     ErasureCodingPolicyInfo[] policies = checkErasureCodingPolicies();
@@ -1340,8 +1352,22 @@ public class TestRouterRpc {
 
     LOG.info("Check the stats");
     ECBlockGroupStats statsRouter = routerProtocol.getECBlockGroupStats();
-    ECBlockGroupStats statsNamenode = nnProtocol.getECBlockGroupStats();
-    assertEquals(statsNamenode.toString(), statsRouter.toString());
+    ECBlockGroupStats statsNamenode = getNamenodeECBlockGroupStats();
+    assertEquals(statsNamenode, statsRouter);
+  }
+
+  /**
+   * Get the EC stats from all namenodes and aggregate them.
+   * @return Aggregated EC stats from all namenodes.
+   * @throws Exception If we cannot get the stats.
+   */
+  private ECBlockGroupStats getNamenodeECBlockGroupStats() throws Exception {
+    List<ECBlockGroupStats> nnStats = new ArrayList<>();
+    for (NamenodeContext nnContext : cluster.getNamenodes()) {
+      ClientProtocol cp = nnContext.getClient().getNamenode();
+      nnStats.add(cp.getECBlockGroupStats());
+    }
+    return ECBlockGroupStats.merge(nnStats);
   }
 
   @Test
@@ -1375,9 +1401,9 @@ public class TestRouterRpc {
         router.getRouter().getNamenodeMetrics();
     final String jsonString0 = metrics.getLiveNodes();
 
-    // We should have 12 nodes in total
+    // We should have the nodes in all the subclusters
     JSONObject jsonObject = new JSONObject(jsonString0);
-    assertEquals(12, jsonObject.names().length());
+    assertEquals(NUM_SUBCLUSTERS * NUM_DNS, jsonObject.names().length());
 
     // We should be caching this information
     String jsonString1 = metrics.getLiveNodes();


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org