You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2017/09/22 01:52:53 UTC

[23/50] hadoop git commit: HDFS-11799. Introduce a config to allow setting up write pipeline with fewer nodes than replication factor. Contributed by Brahma Reddy Battula

HDFS-11799. Introduce a config to allow setting up write pipeline with fewer nodes than replication factor. Contributed by Brahma Reddy Battula


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fda1221c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fda1221c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fda1221c

Branch: refs/heads/YARN-6592
Commit: fda1221c55101d97ac62e1ee4e3ddf9a915d5363
Parents: 31b5840
Author: Brahma Reddy Battula <br...@apache.org>
Authored: Tue Sep 19 11:25:45 2017 +0530
Committer: Brahma Reddy Battula <br...@apache.org>
Committed: Tue Sep 19 11:25:45 2017 +0530

----------------------------------------------------------------------
 .../java/org/apache/hadoop/hdfs/DFSClient.java  |  13 +-
 .../org/apache/hadoop/hdfs/DataStreamer.java    |  31 +-
 .../hdfs/client/HdfsClientConfigKeys.java       |   2 +
 .../src/main/resources/hdfs-default.xml         |  17 ++
 .../TestReplaceDatanodeFailureReplication.java  | 291 +++++++++++++++++++
 .../hadoop/tools/TestHdfsConfigFields.java      |   4 +-
 6 files changed, 354 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 772049d..7e8e95b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -223,6 +223,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
   final String clientName;
   final SocketFactory socketFactory;
   final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
+  final short dtpReplaceDatanodeOnFailureReplication;
   private final FileSystem.Statistics stats;
   private final URI namenodeUri;
   private final Random r = new Random();
@@ -305,7 +306,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
     this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
     this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
     this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-
+    this.dtpReplaceDatanodeOnFailureReplication = (short) conf
+        .getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+                MIN_REPLICATION,
+            HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+                MIN_REPLICATION_DEFAULT);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Sets " + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+              MIN_REPLICATION + " to "
+              + dtpReplaceDatanodeOnFailureReplication);
+    }
     this.ugi = UserGroupInformation.getCurrentUser();
 
     this.namenodeUri = nameNodeUri;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 4eafca1..99fa5f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1384,7 +1384,36 @@ class DataStreamer extends Daemon {
       setPipeline(lb);
 
       //find the new datanode
-      final int d = findNewDatanode(original);
+      final int d;
+      try {
+        d = findNewDatanode(original);
+      } catch (IOException ioe) {
+        // check the minimal number of nodes available to decide whether to
+        // continue the write.
+
+        //if live block location datanodes is greater than or equal to
+        // HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        // MIN_REPLICATION threshold value, continue writing to the
+        // remaining nodes. Otherwise throw exception.
+        //
+        // If HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        // MIN_REPLICATION is set to 0 or less than zero, an exception will be
+        // thrown if a replacement could not be found.
+
+        if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && nodes.length
+            >= dfsClient.dtpReplaceDatanodeOnFailureReplication) {
+          DFSClient.LOG.warn(
+              "Failed to find a new datanode to add to the write pipeline, "
+                  + " continue to write to the pipeline with " + nodes.length
+                  + " nodes since it's no less than minimum replication: "
+                  + dfsClient.dtpReplaceDatanodeOnFailureReplication
+                  + " configured by "
+                  + BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION
+                  + ".", ioe);
+          return;
+        }
+        throw ioe;
+      }
       //transfer replica. pick a source from the original nodes
       final DatanodeInfo src = original[tried % original.length];
       final DatanodeInfo[] targets = {nodes[d]};

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index e99b099..97cb68b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -320,6 +320,8 @@ public interface HdfsClientConfigKeys {
       String  POLICY_DEFAULT = "DEFAULT";
       String  BEST_EFFORT_KEY = PREFIX + "best-effort";
       boolean BEST_EFFORT_DEFAULT = false;
+      String MIN_REPLICATION = PREFIX + "min-replication";
+      short MIN_REPLICATION_DEFAULT = 0;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index af40a34..9327a2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -658,6 +658,23 @@
   </description>
 </property>
 
+  <property>
+    <name>dfs.client.block.write.replace-datanode-on-failure.min-replication</name>
+    <value>0</value>
+    <description>
+      The minimum number of replications that are needed to not to fail
+      the write pipeline if new datanodes can not be found to replace
+      failed datanodes (could be due to network failure) in the write pipeline.
+      If the number of the remaining datanodes in the write pipeline is greater
+      than or equal to this property value, continue writing to the remaining nodes.
+      Otherwise throw exception.
+
+      If this is set to 0, an exception will be thrown, when a replacement
+      can not be found.
+      See also dfs.client.block.write.replace-datanode-on-failure.policy
+    </description>
+  </property>
+
 <property>
   <name>dfs.blockreport.intervalMsec</name>
   <value>21600000</value>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java
new file mode 100644
index 0000000..9591cb4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the behaviours of HdfsClientConfigKeys.BlockWrite.
+ * ReplaceDatanodeOnFailure.MIN_REPLICATION.if live block location datanodes is
+ * greater than or equal to
+ * 'dfs.client.block.write.replace-datanode-on-failure.min.replication'
+ * threshold value, if yes continue writing to the two remaining nodes.
+ * Otherwise it will throw exception.
+ * <p>
+ * If this HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ * MIN_REPLICATION is set to 0 or less than zero, an exception will be thrown
+ * if a replacement could not be found.
+ */
+public class TestReplaceDatanodeFailureReplication {
+  static final Log LOG = LogFactory
+      .getLog(TestReplaceDatanodeFailureReplication.class);
+
+  static final String DIR =
+      "/" + TestReplaceDatanodeFailureReplication.class.getSimpleName() + "/";
+  static final short REPLICATION = 3;
+  final private static String RACK0 = "/rack0";
+
+  /**
+   * Test fail last datanode in the pipeline.
+   */
+  @Test
+  public void testLastDatanodeFailureInPipeline() throws Exception {
+    testWriteFileAndVerifyAfterDNStop(2, 1, 10, false);
+  }
+
+  /**
+   * Test fail first datanode in the pipeline.
+   */
+  @Test
+  public void testFirstDatanodeFailureInPipeline() throws Exception {
+    testWriteFileAndVerifyAfterDNStop(2, 0, 10, false);
+  }
+
+  /**
+   * Test fail all the datanodes except first in the pipeline.
+   */
+  @Test
+  public void testWithOnlyFirstDatanodeIsAlive() throws Exception {
+    testWriteFileAndVerifyAfterDNStop(1, 1, 1, true);
+  }
+
+  /**
+   * Test fail all the datanodes except lastnode in the pipeline.
+   */
+  @Test
+  public void testWithOnlyLastDatanodeIsAlive() throws Exception {
+    testWriteFileAndVerifyAfterDNStop(1, 0, 1, true);
+  }
+
+  /**
+   * Test when number of live nodes are less than the
+   * "dfs.client.block.write.replace-datanode-on-failure.min.replication".
+   */
+  @Test
+  public void testLessNumberOfLiveDatanodesThanWriteReplaceDatanodeOnFailureRF()
+      throws Exception {
+    final MiniDFSCluster cluster = setupCluster(2);
+
+    try {
+      final DistributedFileSystem fs = cluster.getFileSystem();
+      final Path dir = new Path(DIR);
+
+      final SlowWriter[] slowwriters = new SlowWriter[1];
+      for (int i = 1; i <= slowwriters.length; i++) {
+        // create slow writers in different speed
+        slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i),
+            i * 200L);
+      }
+
+      for (SlowWriter s : slowwriters) {
+        s.start();
+      }
+
+      // Let slow writers write something.
+      // Some of them are too slow and will be not yet started.
+      sleepSeconds(1);
+
+      // stop an old datanode
+      cluster.stopDataNode(0);
+      cluster.stopDataNode(0);
+
+      // Let the slow writer writes a few more seconds
+      // Everyone should have written something.
+      sleepSeconds(20);
+
+      // check replication and interrupt.
+      for (SlowWriter s : slowwriters) {
+        try {
+          s.out.getCurrentBlockReplication();
+          Assert.fail(
+              "Must throw exception as failed to add a new datanode for write "
+                  + "pipeline, minimum failure replication");
+        } catch (IOException e) {
+          // expected
+        }
+        s.interruptRunning();
+      }
+
+      // close files
+      for (SlowWriter s : slowwriters) {
+        s.joinAndClose();
+      }
+
+      // Verify the file
+      verifyFileContent(fs, slowwriters);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private MiniDFSCluster setupCluster(int failRF) throws IOException {
+    final Configuration conf = new HdfsConfiguration();
+    conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+        MIN_REPLICATION, failRF);
+    // always replace a datanode
+    ReplaceDatanodeOnFailure.write(Policy.ALWAYS, false, conf);
+
+    final String[] racks = new String[REPLICATION];
+    Arrays.fill(racks, RACK0);
+    return new MiniDFSCluster.Builder(conf).racks(racks)
+        .numDataNodes(REPLICATION).build();
+  }
+
+  private void testWriteFileAndVerifyAfterDNStop(int failRF, int dnindex,
+      int slowWrites, boolean failPipeLine)
+      throws IOException, InterruptedException, TimeoutException {
+    final MiniDFSCluster cluster = setupCluster(failRF);
+    try {
+      final DistributedFileSystem fs = cluster.getFileSystem();
+      final Path dir = new Path(DIR);
+
+      final SlowWriter[] slowwriters = new SlowWriter[slowWrites];
+      for (int i = 1; i <= slowwriters.length; i++) {
+        // create slow writers in different speed
+        slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i),
+            i * 200L);
+      }
+
+      for (SlowWriter s : slowwriters) {
+        s.start();
+      }
+
+      // Let slow writers write something.
+      // Some of them are too slow and will be not yet started.
+      sleepSeconds(3);
+
+      // stop an datanode
+      cluster.stopDataNode(dnindex);
+      if (failPipeLine) {
+        cluster.stopDataNode(dnindex);
+      }
+
+      // Let the slow writer writes a few more seconds
+      // Everyone should have written something.
+      sleepSeconds(5);
+      cluster.waitFirstBRCompleted(0, 10000);
+      // check replication and interrupt.
+      for (SlowWriter s : slowwriters) {
+        Assert.assertEquals(failRF, s.out.getCurrentBlockReplication());
+        s.interruptRunning();
+      }
+
+      // close files
+      for (SlowWriter s : slowwriters) {
+        s.joinAndClose();
+      }
+
+      // Verify the file
+      verifyFileContent(fs, slowwriters);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+    }
+  }
+
+  private void verifyFileContent(DistributedFileSystem fs,
+      SlowWriter[] slowwriters) throws IOException {
+    LOG.info("Verify the file");
+    for (int i = 0; i < slowwriters.length; i++) {
+      LOG.info(slowwriters[i].filepath + ": length=" + fs
+          .getFileStatus(slowwriters[i].filepath).getLen());
+      FSDataInputStream in = null;
+      try {
+        in = fs.open(slowwriters[i].filepath);
+        for (int j = 0, x;; j++) {
+          x = in.read();
+          if ((x) != -1) {
+            Assert.assertEquals(j, x);
+          } else {
+            return;
+          }
+        }
+      } finally {
+        IOUtils.closeStream(in);
+      }
+    }
+  }
+
+  static void sleepSeconds(final int waittime) throws InterruptedException {
+    LOG.info("Wait " + waittime + " seconds");
+    Thread.sleep(waittime * 1000L);
+  }
+
+  static class SlowWriter extends Thread {
+    private final Path filepath;
+    private final HdfsDataOutputStream out;
+    private final long sleepms;
+    private volatile boolean running = true;
+
+    SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms)
+        throws IOException {
+      super(SlowWriter.class.getSimpleName() + ":" + filepath);
+      this.filepath = filepath;
+      this.out = (HdfsDataOutputStream) fs.create(filepath, REPLICATION);
+      this.sleepms = sleepms;
+    }
+
+    @Override public void run() {
+      int i = 0;
+      try {
+        sleep(sleepms);
+        for (; running; i++) {
+          LOG.info(getName() + " writes " + i);
+          out.write(i);
+          out.hflush();
+          sleep(sleepms);
+        }
+      } catch (InterruptedException e) {
+        LOG.info(getName() + " interrupted:" + e);
+      } catch (IOException e) {
+        throw new RuntimeException(getName(), e);
+      } finally {
+        LOG.info(getName() + " terminated: i=" + i);
+      }
+    }
+
+    void interruptRunning() {
+      running = false;
+      interrupt();
+    }
+
+    void joinAndClose() throws InterruptedException {
+      LOG.info(getName() + " join and close");
+      join();
+      IOUtils.closeStream(out);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index 233dc5a..47db565 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -41,8 +41,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
   public void initializeMemberVariables() {
     xmlFilename = new String("hdfs-default.xml");
     configurationClasses = new Class[] { HdfsClientConfigKeys.class,
-        HdfsClientConfigKeys.StripedRead.class,
-        DFSConfigKeys.class};
+        HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
 
     // Set error modes
     errorIfMissingConfigProps = true;


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org