You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by in...@apache.org on 2019/06/14 01:29:13 UTC
[hadoop] branch branch-3.2 updated: HDFS-14560. Allow block
replication parameters to be refreshable. Contributed by Stephen O'Donnell.
This is an automated email from the ASF dual-hosted git repository.
inigoiri pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 335aebb HDFS-14560. Allow block replication parameters to be refreshable. Contributed by Stephen O'Donnell.
335aebb is described below
commit 335aebb9c49ba91abed7e0c6d92b29640e3f7f87
Author: Inigo Goiri <in...@apache.org>
AuthorDate: Thu Jun 13 18:26:53 2019 -0700
HDFS-14560. Allow block replication parameters to be refreshable. Contributed by Stephen O'Donnell.
(cherry picked from commit 4f455290b15902e7e44c4b1a762bf915414b2bb6)
---
.../hdfs/server/blockmanagement/BlockManager.java | 71 +++++++++-
.../hadoop/hdfs/server/namenode/NameNode.java | 64 ++++++++-
.../TestRefreshNamenodeReplicationConfig.java | 143 +++++++++++++++++++++
.../org/apache/hadoop/hdfs/tools/TestDFSAdmin.java | 2 +-
4 files changed, 276 insertions(+), 4 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index f061eb1..2717a9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -382,7 +382,7 @@ public class BlockManager implements BlockStatsMXBean {
final int maxCorruptFilesReturned;
final float blocksInvalidateWorkPct;
- final int blocksReplWorkMultiplier;
+ private int blocksReplWorkMultiplier;
// whether or not to issue block encryption keys.
final boolean encryptDataTransfer;
@@ -896,11 +896,78 @@ public class BlockManager implements BlockStatsMXBean {
out.println("");
}
- /** @return maxReplicationStreams */
+ /** Returns the current setting for maxReplicationStreams, which is set by
+ * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY}.
+ *
+ * @return maxReplicationStreams
+ */
public int getMaxReplicationStreams() {
return maxReplicationStreams;
}
+ static private void ensurePositiveInt(int val, String key) {
+ Preconditions.checkArgument(
+ (val > 0),
+ key + " = '" + val + "' is invalid. " +
+ "It should be a positive, non-zero integer value.");
+ }
+
+ /**
+ * Updates the value used for maxReplicationStreams, which is set by
+ * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY} initially.
+ *
+ * @param newVal - Must be a positive non-zero integer.
+ */
+ public void setMaxReplicationStreams(int newVal) {
+ ensurePositiveInt(newVal,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY);
+ maxReplicationStreams = newVal;
+ }
+
+ /** Returns the current setting for maxReplicationStreamsHardLimit, set by
+ * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}.
+ *
+ * @return maxReplicationStreamsHardLimit
+ */
+ public int getReplicationStreamsHardLimit() {
+ return replicationStreamsHardLimit;
+ }
+
+ /**
+ * Updates the value used for replicationStreamsHardLimit, which is set by
+ * {@code DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY}
+ * initially.
+ *
+ * @param newVal - Must be a positive non-zero integer.
+ */
+ public void setReplicationStreamsHardLimit(int newVal) {
+ ensurePositiveInt(newVal,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY);
+ replicationStreamsHardLimit = newVal;
+ }
+
+ /** Returns the current setting for blocksReplWorkMultiplier, set by
+ * {@code DFSConfigKeys.
+ * DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION}.
+ *
+ * @return maxReplicationStreamsHardLimit
+ */
+ public int getBlocksReplWorkMultiplier() {
+ return blocksReplWorkMultiplier;
+ }
+
+ /**
+ * Updates the value used for blocksReplWorkMultiplier, set by
+ * {@code DFSConfigKeys.
+ * DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION} initially.
+ * @param newVal - Must be a positive non-zero integer.
+ */
+ public void setBlocksReplWorkMultiplier(int newVal) {
+ ensurePositiveInt(newVal,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION);
+ blocksReplWorkMultiplier = newVal;
+ }
+
public int getDefaultStorageNum(BlockInfo block) {
switch (block.getBlockType()) {
case STRIPED: return ((BlockInfoStriped) block).getRealTotalBlockNum();
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
index a8034da..38e9e7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.StoragePolicySatisfierMode;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryAliasMap;
import org.apache.hadoop.hdfs.server.aliasmap.InMemoryLevelDBAliasMapServer;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.RollingUpgradeStartupOption;
@@ -162,6 +163,13 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHEC
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.FS_PROTECTED_DIRECTORIES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MODE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT;
+
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
import static org.apache.hadoop.fs.CommonConfigurationKeys.IPC_BACKOFF_ENABLE;
@@ -296,7 +304,10 @@ public class NameNode extends ReconfigurableBase implements
DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY,
FS_PROTECTED_DIRECTORIES,
HADOOP_CALLER_CONTEXT_ENABLED_KEY,
- DFS_STORAGE_POLICY_SATISFIER_MODE_KEY));
+ DFS_STORAGE_POLICY_SATISFIER_MODE_KEY,
+ DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+ DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION));
private static final String USAGE = "Usage: hdfs namenode ["
+ StartupOption.BACKUP.getName() + "] | \n\t["
@@ -2044,12 +2055,63 @@ public class NameNode extends ReconfigurableBase implements
return reconfigureIPCBackoffEnabled(newVal);
} else if (property.equals(DFS_STORAGE_POLICY_SATISFIER_MODE_KEY)) {
return reconfigureSPSModeEvent(newVal, property);
+ } else if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)
+ || property.equals(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)
+ || property.equals(
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
+ return reconfReplicationParameters(newVal, property);
} else {
throw new ReconfigurationException(property, newVal, getConf().get(
property));
}
}
+ private String reconfReplicationParameters(final String newVal,
+ final String property) throws ReconfigurationException {
+ BlockManager bm = namesystem.getBlockManager();
+ int newSetting;
+ namesystem.writeLock();
+ try {
+ if (property.equals(DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY)) {
+ bm.setMaxReplicationStreams(
+ adjustNewVal(DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT, newVal));
+ newSetting = bm.getMaxReplicationStreams();
+ } else if (property.equals(
+ DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY)) {
+ bm.setReplicationStreamsHardLimit(
+ adjustNewVal(DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT,
+ newVal));
+ newSetting = bm.getReplicationStreamsHardLimit();
+ } else if (
+ property.equals(
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION)) {
+ bm.setBlocksReplWorkMultiplier(
+ adjustNewVal(
+ DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION_DEFAULT,
+ newVal));
+ newSetting = bm.getBlocksReplWorkMultiplier();
+ } else {
+ throw new IllegalArgumentException("Unexpected property " +
+ property + "in reconfReplicationParameters");
+ }
+ LOG.info("RECONFIGURE* changed {} to {}", property, newSetting);
+ return String.valueOf(newSetting);
+ } catch (IllegalArgumentException e) {
+ throw new ReconfigurationException(property, newVal, getConf().get(
+ property), e);
+ } finally {
+ namesystem.writeUnlock();
+ }
+ }
+
+ private int adjustNewVal(int defaultVal, String newVal) {
+ if (newVal == null) {
+ return defaultVal;
+ } else {
+ return Integer.parseInt(newVal);
+ }
+ }
+
private String reconfHeartbeatInterval(final DatanodeManager datanodeManager,
final String property, final String newVal)
throws ReconfigurationException {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java
new file mode 100644
index 0000000..8dc81f8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestRefreshNamenodeReplicationConfig.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.test.LambdaTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import java.io.IOException;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * This class tests the replication related parameters in the namenode can
+ * be refreshed dynamically, without a namenode restart.
+ */
+public class TestRefreshNamenodeReplicationConfig {
+ private MiniDFSCluster cluster = null;
+ private BlockManager bm;
+
+ @Before
+ public void setup() throws IOException {
+ Configuration config = new Configuration();
+ config.setInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 8);
+ config.setInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 10);
+ config.setInt(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+ 12);
+
+ cluster = new MiniDFSCluster.Builder(config)
+ .nnTopology(MiniDFSNNTopology.simpleSingleNN(0, 0))
+ .numDataNodes(0).build();
+ cluster.waitActive();
+ bm = cluster.getNameNode().getNamesystem().getBlockManager();
+ }
+
+ @After
+ public void teardown() throws IOException {
+ cluster.shutdown();
+ }
+
+ /**
+ * Tests to ensure each of the block replication parameters can be passed
+ * updated successfully.
+ */
+ @Test(timeout = 90000)
+ public void testParamsCanBeReconfigured() throws ReconfigurationException {
+
+ assertEquals(8, bm.getMaxReplicationStreams());
+ assertEquals(10, bm.getReplicationStreamsHardLimit());
+ assertEquals(12, bm.getBlocksReplWorkMultiplier());
+
+ cluster.getNameNode().reconfigurePropertyImpl(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, "20");
+ cluster.getNameNode().reconfigurePropertyImpl(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+ "22");
+ cluster.getNameNode().reconfigurePropertyImpl(
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION,
+ "24");
+
+ assertEquals(20, bm.getMaxReplicationStreams());
+ assertEquals(22, bm.getReplicationStreamsHardLimit());
+ assertEquals(24, bm.getBlocksReplWorkMultiplier());
+ }
+
+ /**
+ * Tests to ensure reconfiguration fails with a negative, zero or string value
+ * value for each parameter.
+ */
+ @Test(timeout = 90000)
+ public void testReconfigureFailsWithInvalidValues() throws Exception {
+ String[] keys = new String[]{
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+ DFSConfigKeys.DFS_NAMENODE_REPLICATION_WORK_MULTIPLIER_PER_ITERATION
+ };
+
+ // Ensure we cannot set any of the parameters negative
+ for (String key : keys) {
+ ReconfigurationException e =
+ LambdaTestUtils.intercept(ReconfigurationException.class,
+ () -> cluster.getNameNode().reconfigurePropertyImpl(key, "-20"));
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ assertEquals(key+" = '-20' is invalid. It should be a "
+ +"positive, non-zero integer value.", e.getCause().getMessage());
+ }
+ // Ensure none of the values were updated from the defaults
+ assertEquals(8, bm.getMaxReplicationStreams());
+ assertEquals(10, bm.getReplicationStreamsHardLimit());
+ assertEquals(12, bm.getBlocksReplWorkMultiplier());
+
+ for (String key : keys) {
+ ReconfigurationException e =
+ LambdaTestUtils.intercept(ReconfigurationException.class,
+ () -> cluster.getNameNode().reconfigurePropertyImpl(key, "0"));
+ assertTrue(e.getCause() instanceof IllegalArgumentException);
+ assertEquals(key+" = '0' is invalid. It should be a "
+ +"positive, non-zero integer value.", e.getCause().getMessage());
+ }
+
+ // Ensure none of the values were updated from the defaults
+ assertEquals(8, bm.getMaxReplicationStreams());
+ assertEquals(10, bm.getReplicationStreamsHardLimit());
+ assertEquals(12, bm.getBlocksReplWorkMultiplier());
+
+ // Ensure none of the parameters can be set to a string value
+ for (String key : keys) {
+ ReconfigurationException e =
+ LambdaTestUtils.intercept(ReconfigurationException.class,
+ () -> cluster.getNameNode().reconfigurePropertyImpl(key, "str"));
+ assertTrue(e.getCause() instanceof NumberFormatException);
+ }
+
+ // Ensure none of the values were updated from the defaults
+ assertEquals(8, bm.getMaxReplicationStreams());
+ assertEquals(10, bm.getReplicationStreamsHardLimit());
+ assertEquals(12, bm.getBlocksReplWorkMultiplier());
+ }
+}
\ No newline at end of file
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index 015c9a4..c7d8c7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -394,7 +394,7 @@ public class TestDFSAdmin {
final List<String> outs = Lists.newArrayList();
final List<String> errs = Lists.newArrayList();
getReconfigurableProperties("namenode", address, outs, errs);
- assertEquals(7, outs.size());
+ assertEquals(10, outs.size());
assertEquals(DFS_HEARTBEAT_INTERVAL_KEY, outs.get(1));
assertEquals(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, outs.get(2));
assertEquals(errs.size(), 0);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org