You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/06/14 01:29:13 UTC

[hadoop] branch branch-3.2 updated: HDFS-14560. Allow block replication parameters to be refreshable. Contributed by Stephen O'Donnell.

This is an automated email from the ASF dual-hosted git repository.

inigoiri pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 335aebb  HDFS-14560. Allow block replication parameters to be refreshable. Contributed by Stephen O'Donnell.
335aebb is described below

commit 335aebb9c49ba91abed7e0c6d92b29640e3f7f87
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Thu Jun 13 18:26:53 2019 -0700

    HDFS-14560. Allow block replication parameters to be refreshable. Contributed by Stephen O'Donnell.
    
    (cherry picked from commit 4f455290b15902e7e44c4b1a762bf915414b2bb6)
---
 .../hdfs/server/blockmanagement/BlockManager.java  |  71 +++++++++-
 .../hadoop/hdfs/server/namenode/NameNode.java      |  64 ++++++++-
 .../TestRefreshNamenodeReplicationConfig.java      | 143 +++++++++++++++++++++
 .../org/apache/hadoop/hdfs/tools/TestDFSAdmin.java |   2 +-
 4 files changed, 276 insertions(+), 4 deletions(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index f061eb1..2717a9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -382,7 +382,7 @@ public class BlockManager implements BlockStatsMXBean {
   final int maxCorruptFilesReturned;
 
   final float blocksInvalidateWorkPct;
-  final int blocksReplWorkMultiplier;
+  private int blocksReplWorkMultiplier;
 
   // whether or not to issue block encryption keys.
   final boolean encryptDataTransfer;
@@ -896,11 +896,78 @@ public class BlockManager implements BlockStatsMXBean {
     out.println("");
   }
 
-  /** @return maxReplicationStreams */
+  /** Returns the current setting for maxReplicationStreams, which is set by
+   *  {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY}.
+   *
+   *  @return maxReplicationStreams
+   */
   public int getMaxReplicationStreams() {
     return maxReplicationStreams;
   }
 
+  static private void ensurePositiveInt(int val, String key) {
+    Preconditions.checkArgument(
+        (val > 0),
+        key + " = '" + val + "' is invalid. " +
+            "It should be a positive, non-zero integer value.");
+  }
+
+  /**
+   * Updates the value used for maxReplicationStreams, which is set by
+   * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} initially.
+   *
+   * @param newVal - Must be a positive non-zero integer.
+   */
+  public void setMaxReplicationStreams(int newVal) {
+    ensurePositiveInt(newVal,
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
+    maxReplicationStreams = newVal;
+  }
+
+  /** Returns the current setting for maxReplicationStreamsHardLimit, set by
+   * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}.
+   *
+   *  @return maxReplicationStreamsHardLimit
+   */
+  public int getReplicationStreamsHardLimit() {
+    return replicationStreamsHardLimit;
+  }
+
+  /**
+   * Updates the value used for replicationStreamsHardLimit, which is set by
+   * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}
+   * initially.
+   *
+   * @param newVal - Must be a positive non-zero integer.
+   */
+  public void setReplicationStreamsHardLimit(int newVal) {
+    ensurePositiveInt(newVal,
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY);
+    replicationStreamsHardLimit = newVal;
+  }
+
+  /** Returns the current setting for blocksReplWorkMultiplier, set by
+   * {@code DFSConfigKeys.
+   *     DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION}.
+   *
+   *  @return maxReplicationStreamsHardLimit
+   */
+  public int getBlocksReplWorkMultiplier() {
+    return blocksReplWorkMultiplier;
+  }
+
+  /**
+   * Updates the value used for blocksReplWorkMultiplier, set by
+   * {@code DFSConfigKeys.
+   *     DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION} initially.
+   * @param newVal - Must be a positive non-zero integer.
+   */
+  public void setBlocksReplWorkMultiplier(int newVal) {
+    ensurePositiveInt(newVal,
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
+    blocksReplWorkMultiplier = newVal;
+  }
+
   public int getDefaultStorageNum(BlockInfo block) {
     switch (block.getBlockType()) {
     case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index a8034da..38e9e7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
 import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
 import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@@ -162,6 +163,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT;
+
 import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
 import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
@@ -296,7 +304,10 @@ public class NameNode extends ReconfigurableBase implements
           DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
           FS_PROTECTED_DIRECTORIES,
           HADOOP_CALLER_CONTEXT_ENABLED_KEY,
-          DFS_STORAGE_POLICY_SATISFIER_MODE_KEY));
+          DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+          DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+          DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+          DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION));
 
   private static final String USAGE = "Usage: hdfs namenode ["
       + StartupOption.BACKUP.getName() + "] | \n\t["
@@ -2044,12 +2055,63 @@ public class NameNode extends ReconfigurableBase implements
       return reconfigureIPCBackoffEnabled(newVal);
     } else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
       return reconfigureSPSModeEvent(newVal, property);
+    } else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
+        || property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
+        || property.equals(
+            DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
+      return reconfReplicationParameters(newVal, property);
     } else {
       throw new ReconfigurationException(property, newVal, getConf().get(
           property));
     }
   }
 
+  private String reconfReplicationParameters(final String newVal,
+      final String property) throws ReconfigurationException {
+    BlockManager bm = namesystem.getBlockManager();
+    int newSetting;
+    namesystem.writeLock();
+    try {
+      if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)) {
+        bm.setMaxReplicationStreams(
+            adjustNewVal(DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT, newVal));
+        newSetting = bm.getMaxReplicationStreams();
+      } else if (property.equals(
+          DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)) {
+        bm.setReplicationStreamsHardLimit(
+            adjustNewVal(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT,
+                newVal));
+        newSetting = bm.getReplicationStreamsHardLimit();
+      } else if (
+          property.equals(
+              DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
+        bm.setBlocksReplWorkMultiplier(
+            adjustNewVal(
+                DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
+                newVal));
+        newSetting = bm.getBlocksReplWorkMultiplier();
+      } else {
+        throw new IllegalArgumentException("Unexpected property " +
+            property + "in reconfReplicationParameters");
+      }
+      LOG.info("RECONFIGURE* changed {} to {}", property, newSetting);
+      return String.valueOf(newSetting);
+    } catch (IllegalArgumentException e) {
+      throw new ReconfigurationException(property, newVal, getConf().get(
+          property), e);
+    } finally {
+      namesystem.writeUnlock();
+    }
+  }
+
+  private int adjustNewVal(int defaultVal, String newVal) {
+    if (newVal == null) {
+      return defaultVal;
+    } else {
+      return Integer.parseInt(newVal);
+    }
+  }
+
   private String reconfHeartbeatInterval(final DatanodeManager datanodeManager,
       final String property, final String newVal)
       throws ReconfigurationException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java
new file mode 100644
index 0000000..8dc81f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the replication related parameters in the namenode can
+ * be refreshed dynamically, without a namenode restart.
+ */
+public class TestRefreshNamenodeReplicationConfig {
+  private MiniDFSCluster cluster = null;
+  private BlockManager bm;
+
+  @Before
+  public void setup() throws IOException {
+    Configuration config = new Configuration();
+    config.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 8);
+    config.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 10);
+    config.setInt(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+        12);
+
+    cluster = new MiniDFSCluster.Builder(config)
+        .nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0))
+        .numDataNodes(0).build();
+    cluster.waitActive();
+    bm = cluster.getNameNode().getNamesystem().getBlockManager();
+  }
+
+  @After
+  public void teardown() throws IOException {
+    cluster.shutdown();
+  }
+
+  /**
+   * Tests to ensure each of the block replication parameters can be passed
+   * updated successfully.
+   */
+  @Test(timeout = 90000)
+  public void testParamsCanBeReconfigured() throws ReconfigurationException {
+
+    assertEquals(8, bm.getMaxReplicationStreams());
+    assertEquals(10, bm.getReplicationStreamsHardLimit());
+    assertEquals(12, bm.getBlocksReplWorkMultiplier());
+
+    cluster.getNameNode().reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20");
+    cluster.getNameNode().reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+        "22");
+    cluster.getNameNode().reconfigurePropertyImpl(
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+        "24");
+
+    assertEquals(20, bm.getMaxReplicationStreams());
+    assertEquals(22, bm.getReplicationStreamsHardLimit());
+    assertEquals(24, bm.getBlocksReplWorkMultiplier());
+  }
+
+  /**
+   * Tests to ensure reconfiguration fails with a negative, zero or string value
+   * value for each parameter.
+   */
+  @Test(timeout = 90000)
+  public void testReconfigureFailsWithInvalidValues() throws Exception {
+    String[] keys = new String[]{
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+        DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+    };
+
+    // Ensure we cannot set any of the parameters negative
+    for (String key : keys) {
+      ReconfigurationException e =
+          LambdaTestUtils.intercept(ReconfigurationException.class,
+              () -> cluster.getNameNode().reconfigurePropertyImpl(key, "-20"));
+      assertTrue(e.getCause() instanceof IllegalArgumentException);
+      assertEquals(key+" = '-20' is invalid. It should be a "
+          +"positive, non-zero integer value.", e.getCause().getMessage());
+    }
+    // Ensure none of the values were updated from the defaults
+    assertEquals(8, bm.getMaxReplicationStreams());
+    assertEquals(10, bm.getReplicationStreamsHardLimit());
+    assertEquals(12, bm.getBlocksReplWorkMultiplier());
+
+    for (String key : keys) {
+      ReconfigurationException e =
+          LambdaTestUtils.intercept(ReconfigurationException.class,
+              () -> cluster.getNameNode().reconfigurePropertyImpl(key, "0"));
+      assertTrue(e.getCause() instanceof IllegalArgumentException);
+      assertEquals(key+" = '0' is invalid. It should be a "
+          +"positive, non-zero integer value.", e.getCause().getMessage());
+    }
+
+    // Ensure none of the values were updated from the defaults
+    assertEquals(8, bm.getMaxReplicationStreams());
+    assertEquals(10, bm.getReplicationStreamsHardLimit());
+    assertEquals(12, bm.getBlocksReplWorkMultiplier());
+
+    // Ensure none of the parameters can be set to a string value
+    for (String key : keys) {
+      ReconfigurationException e =
+          LambdaTestUtils.intercept(ReconfigurationException.class,
+              () -> cluster.getNameNode().reconfigurePropertyImpl(key, "str"));
+      assertTrue(e.getCause() instanceof NumberFormatException);
+    }
+
+    // Ensure none of the values were updated from the defaults
+    assertEquals(8, bm.getMaxReplicationStreams());
+    assertEquals(10, bm.getReplicationStreamsHardLimit());
+    assertEquals(12, bm.getBlocksReplWorkMultiplier());
+  }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 015c9a4..c7d8c7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -394,7 +394,7 @@ public class TestDFSAdmin {
     final List<String> outs = Lists.newArrayList();
     final List<String> errs = Lists.newArrayList();
     getReconfigurableProperties("namenode", address, outs, errs);
-    assertEquals(7, outs.size());
+    assertEquals(10, outs.size());
     assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
     assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
     assertEquals(errs.size(), 0);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org