You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2015/12/06 08:13:35 UTC

[33/38] hadoop git commit: HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves without DN restart. (Contributed by Xiaobing Zhou)

HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves without DN restart. (Contributed by Xiaobing Zhou)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9d817fa1
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9d817fa1
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9d817fa1

Branch: refs/heads/yarn-2877
Commit: 9d817fa1b14b477e5440ae4edd78de849976d9b5
Parents: 59dbe8b
Author: Arpit Agarwal <ar...@apache.org>
Authored: Fri Dec 4 14:46:46 2015 -0800
Committer: Arpit Agarwal <ar...@apache.org>
Committed: Fri Dec 4 14:46:46 2015 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../hadoop/hdfs/server/datanode/DataNode.java   |  40 ++-
 .../hdfs/server/datanode/DataXceiverServer.java |  58 +++--
 .../datanode/TestDataNodeReconfiguration.java   | 241 +++++++++++++++++++
 .../apache/hadoop/hdfs/tools/TestDFSAdmin.java  |   2 +-
 5 files changed, 319 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 34c3ff2..e10450d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1720,6 +1720,9 @@ Release 2.8.0 - UNRELEASED
     HDFS-9474. TestPipelinesFailover should not fail when printing debug
     message. (John Zhuge via Yongjun Zhang)
 
+    HDFS-9214. Support reconfiguring dfs.datanode.balance.max.concurrent.moves
+    without DN restart. (Xiaobing Zhou via Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 0a68758..150ce6b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -42,6 +42,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_DEFA
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_OOB_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT;
@@ -92,7 +94,6 @@ import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.management.ObjectName;
 
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -212,6 +213,7 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.Lists;
 import com.google.protobuf.BlockingService;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -284,7 +286,9 @@ public class DataNode extends ReconfigurableBase
   /** A list of property that are reconfigurable at runtime. */
   private static final List<String> RECONFIGURABLE_PROPERTIES =
       Collections.unmodifiableList(
-          Arrays.asList(DFS_DATANODE_DATA_DIR_KEY));
+          Arrays.asList(
+              DFS_DATANODE_DATA_DIR_KEY,
+              DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
 
   public static final Log METRICS_LOG = LogFactory.getLog("DataNodeMetricsLog");
 
@@ -522,6 +526,38 @@ public class DataNode extends ReconfigurableBase
           }
         }
       }
+    } else if (property.equals(
+        DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)) {
+      ReconfigurationException rootException = null;
+      try {
+        LOG.info("Reconfiguring " + property + " to " + newVal);
+        int movers;
+        if (newVal == null) {
+          // set to default
+          movers = DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
+        } else {
+          movers = Integer.parseInt(newVal);
+          if (movers <= 0) {
+            rootException = new ReconfigurationException(
+                property,
+                newVal,
+                getConf().get(property),
+                new IllegalArgumentException(
+                    "balancer max concurrent movers must be larger than 0"));
+          }
+        }
+        xserver.updateBalancerMaxConcurrentMovers(movers);
+      } catch(NumberFormatException nfe) {
+        rootException = new ReconfigurationException(
+            property, newVal, getConf().get(property), nfe);
+      } finally {
+        if (rootException != null) {
+          LOG.warn(String.format(
+              "Exception in updating balancer max concurrent movers %s to %s",
+              property, newVal), rootException);
+          throw rootException;
+        }
+      }
     } else {
       throw new ReconfigurationException(
           property, newVal, getConf().get(property));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
index 36852eb..36cf8a1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiverServer.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.net.SocketTimeoutException;
 import java.nio.channels.AsynchronousCloseException;
 import java.util.HashMap;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -31,6 +32,7 @@ import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.util.Daemon;
 
 import com.google.common.annotations.VisibleForTesting;
+
 import org.slf4j.Logger;
 
 /**
@@ -64,36 +66,45 @@ class DataXceiverServer implements Runnable {
    */
   static class BlockBalanceThrottler extends DataTransferThrottler {
    private int numThreads;
-   private int maxThreads;
-   
+   private final AtomicInteger maxThreads = new AtomicInteger(0);
+
    /**Constructor
     * 
     * @param bandwidth Total amount of bandwidth can be used for balancing 
     */
-   private BlockBalanceThrottler(long bandwidth, int maxThreads) {
-     super(bandwidth);
-     this.maxThreads = maxThreads;
-     LOG.info("Balancing bandwith is "+ bandwidth + " bytes/s");
-     LOG.info("Number threads for balancing is "+ maxThreads);
-   }
-   
+    private BlockBalanceThrottler(long bandwidth, int maxThreads) {
+      super(bandwidth);
+      this.maxThreads.set(maxThreads);
+      LOG.info("Balancing bandwith is " + bandwidth + " bytes/s");
+      LOG.info("Number threads for balancing is " + maxThreads);
+    }
+
+    private void setMaxConcurrentMovers(int movers) {
+      this.maxThreads.set(movers);
+    }
+
+    @VisibleForTesting
+    int getMaxConcurrentMovers() {
+      return this.maxThreads.get();
+    }
+
    /** Check if the block move can start. 
     * 
     * Return true if the thread quota is not exceeded and 
     * the counter is incremented; False otherwise.
     */
-   synchronized boolean acquire() {
-     if (numThreads >= maxThreads) {
-       return false;
-     }
-     numThreads++;
-     return true;
-   }
-   
-   /** Mark that the move is completed. The thread counter is decremented. */
-   synchronized void release() {
-     numThreads--;
-   }
+    synchronized boolean acquire() {
+      if (numThreads >= maxThreads.get()) {
+        return false;
+      }
+      numThreads++;
+      return true;
+    }
+
+    /** Mark that the move is completed. The thread counter is decremented. */
+    synchronized void release() {
+      numThreads--;
+    }
   }
 
   final BlockBalanceThrottler balanceThrottler;
@@ -108,7 +119,6 @@ class DataXceiverServer implements Runnable {
   
   DataXceiverServer(PeerServer peerServer, Configuration conf,
       DataNode datanode) {
-    
     this.peerServer = peerServer;
     this.datanode = datanode;
     
@@ -288,4 +298,8 @@ class DataXceiverServer implements Runnable {
     peers.remove(peer);
     peersXceiver.remove(peer);
   }
+
+  public void updateBalancerMaxConcurrentMovers(int movers) {
+    balanceThrottler.setMaxConcurrentMovers(movers);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
new file mode 100644
index 0000000..edaf7ab
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeReconfiguration.java
@@ -0,0 +1,241 @@
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.ReconfigurationException;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * Test to reconfigure some parameters for DataNode without restart
+ */
+public class TestDataNodeReconfiguration {
+
+  private static final Log LOG = LogFactory.getLog(TestBlockRecovery.class);
+  private static final String DATA_DIR = MiniDFSCluster.getBaseDirectory()
+      + "data";
+  private final static InetSocketAddress NN_ADDR = new InetSocketAddress(
+      "localhost", 5020);
+  private final int NUM_NAME_NODE = 1;
+  private final int NUM_DATA_NODE = 10;
+  private MiniDFSCluster cluster;
+
+  @Before
+  public void Setup() throws IOException {
+    startDFSCluster(NUM_NAME_NODE, NUM_DATA_NODE);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (cluster != null) {
+      cluster.shutdown();
+      cluster = null;
+    }
+
+    File dir = new File(DATA_DIR);
+    if (dir.exists())
+      Assert.assertTrue("Cannot delete data-node dirs",
+          FileUtil.fullyDelete(dir));
+  }
+
+  private void startDFSCluster(int numNameNodes, int numDataNodes)
+      throws IOException {
+    Configuration conf = new Configuration();
+
+    MiniDFSNNTopology nnTopology = MiniDFSNNTopology
+        .simpleFederatedTopology(numNameNodes);
+
+    cluster = new MiniDFSCluster.Builder(conf).nnTopology(nnTopology)
+        .numDataNodes(numDataNodes).build();
+    cluster.waitActive();
+  }
+
+  /**
+   * Starts an instance of DataNode
+   *
+   * @throws IOException
+   */
+  public DataNode[] createDNsForTest(int numDateNode) throws IOException {
+    Configuration conf = new HdfsConfiguration();
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, DATA_DIR);
+    conf.set(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_HTTP_ADDRESS_KEY, "0.0.0.0:0");
+    conf.set(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, "0.0.0.0:0");
+    conf.setInt(CommonConfigurationKeys.IPC_CLIENT_CONNECT_MAX_RETRIES_KEY, 0);
+
+    DataNode[] result = new DataNode[numDateNode];
+    for (int i = 0; i < numDateNode; i++) {
+      result[i] = DataNodeTestUtils.startDNWithMockNN(conf, NN_ADDR, DATA_DIR);
+    }
+    return result;
+  }
+
+  @Test
+  public void testMaxConcurrentMoversReconfiguration()
+      throws ReconfigurationException, IOException {
+    int maxConcurrentMovers = 10;
+    for (int i = 0; i < NUM_DATA_NODE; i++) {
+      DataNode dn = cluster.getDataNodes().get(i);
+
+      // try invalid values
+      try {
+        dn.reconfigureProperty(
+            DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, "text");
+        fail("ReconfigurationException expected");
+      } catch (ReconfigurationException expected) {
+        assertTrue("expecting NumberFormatException",
+            expected.getCause() instanceof NumberFormatException);
+      }
+      try {
+        dn.reconfigureProperty(
+            DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+            String.valueOf(-1));
+        fail("ReconfigurationException expected");
+      } catch (ReconfigurationException expected) {
+        assertTrue("expecting IllegalArgumentException",
+            expected.getCause() instanceof IllegalArgumentException);
+      }
+      try {
+        dn.reconfigureProperty(
+            DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+            String.valueOf(0));
+        fail("ReconfigurationException expected");
+      } catch (ReconfigurationException expected) {
+        assertTrue("expecting IllegalArgumentException",
+            expected.getCause() instanceof IllegalArgumentException);
+      }
+
+      // change properties
+      dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+          String.valueOf(maxConcurrentMovers));
+
+      // verify change
+      assertEquals(String.format("%s has wrong value",
+          DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
+          maxConcurrentMovers, dn.xserver.balanceThrottler.getMaxConcurrentMovers());
+
+      assertEquals(String.format("%s has wrong value",
+          DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
+          maxConcurrentMovers, Integer.parseInt(dn.getConf().get(
+              DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY)));
+
+      // revert to default
+      dn.reconfigureProperty(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+          null);
+
+      // verify default
+      assertEquals(String.format("%s has wrong value",
+          DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY),
+          DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT,
+          dn.xserver.balanceThrottler.getMaxConcurrentMovers());
+
+      assertEquals(String.format("expect %s is not configured",
+          DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY), null, dn
+          .getConf().get(DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY));
+    }
+  }
+
+  @Test
+  public void testAcquireWithMaxConcurrentMoversGreaterThanDefault()
+      throws IOException, ReconfigurationException {
+    testAcquireWithMaxConcurrentMoversShared(10);
+  }
+
+  @Test
+  public void testAcquireWithMaxConcurrentMoversLessThanDefault()
+      throws IOException, ReconfigurationException {
+    testAcquireWithMaxConcurrentMoversShared(3);
+  }
+
+  private void testAcquireWithMaxConcurrentMoversShared(
+      int maxConcurrentMovers)
+      throws IOException, ReconfigurationException {
+    DataNode[] dns = null;
+    try {
+      dns = createDNsForTest(1);
+      testAcquireOnMaxConcurrentMoversReconfiguration(dns[0],
+          maxConcurrentMovers);
+    } catch (IOException ioe) {
+      throw ioe;
+    } catch (ReconfigurationException re) {
+      throw re;
+    } finally {
+      shutDownDNs(dns);
+    }
+  }
+
+  private void shutDownDNs(DataNode[] dns) {
+    if (dns == null) {
+      return;
+    }
+
+    for (int i = 0; i < dns.length; i++) {
+      try {
+        if (dns[i] == null) {
+          continue;
+        }
+        dns[i].shutdown();
+      } catch (Exception e) {
+        LOG.error("Cannot close: ", e);
+      }
+    }
+  }
+
+  private void testAcquireOnMaxConcurrentMoversReconfiguration(
+      DataNode dataNode, int maxConcurrentMovers) throws IOException,
+      ReconfigurationException {
+    int defaultMaxThreads = dataNode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
+    for (int i = 0; i < defaultMaxThreads; i++) {
+      assertEquals("should be able to get thread quota", true,
+          dataNode.xserver.balanceThrottler.acquire());
+    }
+
+    assertEquals("should not be able to get thread quota", false,
+        dataNode.xserver.balanceThrottler.acquire());
+
+    // change properties
+    dataNode.reconfigureProperty(
+        DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
+        String.valueOf(maxConcurrentMovers));
+
+    assertEquals("thread quota is wrong", maxConcurrentMovers,
+        dataNode.xserver.balanceThrottler.getMaxConcurrentMovers()); // thread quota
+
+    int val = Math.abs(maxConcurrentMovers - defaultMaxThreads);
+    if (defaultMaxThreads < maxConcurrentMovers) {
+      for (int i = 0; i < val; i++) {
+        assertEquals("should be able to get thread quota", true,
+            dataNode.xserver.balanceThrottler.acquire());
+      }
+    } else if (defaultMaxThreads > maxConcurrentMovers) {
+      for (int i = 0; i < val; i++) {
+        assertEquals("should not be able to get thread quota", false,
+            dataNode.xserver.balanceThrottler.acquire());
+      }
+    }
+
+    assertEquals("should not be able to get thread quota", false,
+        dataNode.xserver.balanceThrottler.acquire());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/9d817fa1/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
index a2b5638..3a30ccf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/tools/TestDFSAdmin.java
@@ -207,7 +207,7 @@ public class TestDFSAdmin {
     final String address = "localhost:" + port;
     List<String> outputs =
         getReconfigurationAllowedProperties("datanode", address);
-    assertEquals(2, outputs.size());
+    assertEquals(3, outputs.size());
     assertEquals(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY,
         outputs.get(1));
   }