You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by jh...@apache.org on 2017/09/21 01:07:25 UTC
[22/50] [abbrv] hadoop git commit: HDFS-11799. Introduce a config to
allow setting up write pipeline with fewer nodes than replication factor.
Contributed by Brahma Reddy Battula
HDFS-11799. Introduce a config to allow setting up write pipeline with fewer nodes than replication factor. Contributed by Brahma Reddy Battula
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/fda1221c
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/fda1221c
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/fda1221c
Branch: refs/heads/YARN-5734
Commit: fda1221c55101d97ac62e1ee4e3ddf9a915d5363
Parents: 31b5840
Author: Brahma Reddy Battula <br...@apache.org>
Authored: Tue Sep 19 11:25:45 2017 +0530
Committer: Brahma Reddy Battula <br...@apache.org>
Committed: Tue Sep 19 11:25:45 2017 +0530
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSClient.java | 13 +-
.../org/apache/hadoop/hdfs/DataStreamer.java | 31 +-
.../hdfs/client/HdfsClientConfigKeys.java | 2 +
.../src/main/resources/hdfs-default.xml | 17 ++
.../TestReplaceDatanodeFailureReplication.java | 291 +++++++++++++++++++
.../hadoop/tools/TestHdfsConfigFields.java | 4 +-
6 files changed, 354 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 772049d..7e8e95b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -223,6 +223,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
final String clientName;
final SocketFactory socketFactory;
final ReplaceDatanodeOnFailure dtpReplaceDatanodeOnFailure;
+ final short dtpReplaceDatanodeOnFailureReplication;
private final FileSystem.Statistics stats;
private final URI namenodeUri;
private final Random r = new Random();
@@ -305,7 +306,17 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
this.socketFactory = NetUtils.getSocketFactory(conf, ClientProtocol.class);
this.dtpReplaceDatanodeOnFailure = ReplaceDatanodeOnFailure.get(conf);
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
-
+ this.dtpReplaceDatanodeOnFailureReplication = (short) conf
+ .getInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION,
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION_DEFAULT);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Sets " + HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION + " to "
+ + dtpReplaceDatanodeOnFailureReplication);
+ }
this.ugi = UserGroupInformation.getCurrentUser();
this.namenodeUri = nameNodeUri;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 4eafca1..99fa5f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -1384,7 +1384,36 @@ class DataStreamer extends Daemon {
setPipeline(lb);
//find the new datanode
- final int d = findNewDatanode(original);
+ final int d;
+ try {
+ d = findNewDatanode(original);
+ } catch (IOException ioe) {
+ // check the minimal number of nodes available to decide whether to
+ // continue the write.
+
+ //if live block location datanodes is greater than or equal to
+ // HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ // MIN_REPLICATION threshold value, continue writing to the
+ // remaining nodes. Otherwise throw exception.
+ //
+ // If HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ // MIN_REPLICATION is set to 0 or less than zero, an exception will be
+ // thrown if a replacement could not be found.
+
+ if (dfsClient.dtpReplaceDatanodeOnFailureReplication > 0 && nodes.length
+ >= dfsClient.dtpReplaceDatanodeOnFailureReplication) {
+ DFSClient.LOG.warn(
+ "Failed to find a new datanode to add to the write pipeline, "
+ + " continue to write to the pipeline with " + nodes.length
+ + " nodes since it's no less than minimum replication: "
+ + dfsClient.dtpReplaceDatanodeOnFailureReplication
+ + " configured by "
+ + BlockWrite.ReplaceDatanodeOnFailure.MIN_REPLICATION
+ + ".", ioe);
+ return;
+ }
+ throw ioe;
+ }
//transfer replica. pick a source from the original nodes
final DatanodeInfo src = original[tried % original.length];
final DatanodeInfo[] targets = {nodes[d]};
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index e99b099..97cb68b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -320,6 +320,8 @@ public interface HdfsClientConfigKeys {
String POLICY_DEFAULT = "DEFAULT";
String BEST_EFFORT_KEY = PREFIX + "best-effort";
boolean BEST_EFFORT_DEFAULT = false;
+ String MIN_REPLICATION = PREFIX + "min-replication";
+ short MIN_REPLICATION_DEFAULT = 0;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index af40a34..9327a2c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -658,6 +658,23 @@
</description>
</property>
+ <property>
+ <name>dfs.client.block.write.replace-datanode-on-failure.min-replication</name>
+ <value>0</value>
+ <description>
+ The minimum number of replications that are needed to not to fail
+ the write pipeline if new datanodes can not be found to replace
+ failed datanodes (could be due to network failure) in the write pipeline.
+ If the number of the remaining datanodes in the write pipeline is greater
+ than or equal to this property value, continue writing to the remaining nodes.
+ Otherwise throw exception.
+
+ If this is set to 0, an exception will be thrown, when a replacement
+ can not be found.
+ See also dfs.client.block.write.replace-datanode-on-failure.policy
+ </description>
+ </property>
+
<property>
<name>dfs.blockreport.intervalMsec</name>
<value>21600000</value>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java
new file mode 100644
index 0000000..9591cb4
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplaceDatanodeFailureReplication.java
@@ -0,0 +1,291 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.concurrent.TimeoutException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure;
+import org.apache.hadoop.hdfs.protocol.datatransfer.ReplaceDatanodeOnFailure.Policy;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Verify the behaviours of HdfsClientConfigKeys.BlockWrite.
+ * ReplaceDatanodeOnFailure.MIN_REPLICATION.if live block location datanodes is
+ * greater than or equal to
+ * 'dfs.client.block.write.replace-datanode-on-failure.min.replication'
+ * threshold value, if yes continue writing to the two remaining nodes.
+ * Otherwise it will throw exception.
+ * <p>
+ * If this HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ * MIN_REPLICATION is set to 0 or less than zero, an exception will be thrown
+ * if a replacement could not be found.
+ */
+public class TestReplaceDatanodeFailureReplication {
+ static final Log LOG = LogFactory
+ .getLog(TestReplaceDatanodeFailureReplication.class);
+
+ static final String DIR =
+ "/" + TestReplaceDatanodeFailureReplication.class.getSimpleName() + "/";
+ static final short REPLICATION = 3;
+ final private static String RACK0 = "/rack0";
+
+ /**
+ * Test fail last datanode in the pipeline.
+ */
+ @Test
+ public void testLastDatanodeFailureInPipeline() throws Exception {
+ testWriteFileAndVerifyAfterDNStop(2, 1, 10, false);
+ }
+
+ /**
+ * Test fail first datanode in the pipeline.
+ */
+ @Test
+ public void testFirstDatanodeFailureInPipeline() throws Exception {
+ testWriteFileAndVerifyAfterDNStop(2, 0, 10, false);
+ }
+
+ /**
+ * Test fail all the datanodes except first in the pipeline.
+ */
+ @Test
+ public void testWithOnlyFirstDatanodeIsAlive() throws Exception {
+ testWriteFileAndVerifyAfterDNStop(1, 1, 1, true);
+ }
+
+ /**
+ * Test fail all the datanodes except lastnode in the pipeline.
+ */
+ @Test
+ public void testWithOnlyLastDatanodeIsAlive() throws Exception {
+ testWriteFileAndVerifyAfterDNStop(1, 0, 1, true);
+ }
+
+ /**
+ * Test when number of live nodes are less than the
+ * "dfs.client.block.write.replace-datanode-on-failure.min.replication".
+ */
+ @Test
+ public void testLessNumberOfLiveDatanodesThanWriteReplaceDatanodeOnFailureRF()
+ throws Exception {
+ final MiniDFSCluster cluster = setupCluster(2);
+
+ try {
+ final DistributedFileSystem fs = cluster.getFileSystem();
+ final Path dir = new Path(DIR);
+
+ final SlowWriter[] slowwriters = new SlowWriter[1];
+ for (int i = 1; i <= slowwriters.length; i++) {
+ // create slow writers in different speed
+ slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i),
+ i * 200L);
+ }
+
+ for (SlowWriter s : slowwriters) {
+ s.start();
+ }
+
+ // Let slow writers write something.
+ // Some of them are too slow and will be not yet started.
+ sleepSeconds(1);
+
+ // stop an old datanode
+ cluster.stopDataNode(0);
+ cluster.stopDataNode(0);
+
+ // Let the slow writer writes a few more seconds
+ // Everyone should have written something.
+ sleepSeconds(20);
+
+ // check replication and interrupt.
+ for (SlowWriter s : slowwriters) {
+ try {
+ s.out.getCurrentBlockReplication();
+ Assert.fail(
+ "Must throw exception as failed to add a new datanode for write "
+ + "pipeline, minimum failure replication");
+ } catch (IOException e) {
+ // expected
+ }
+ s.interruptRunning();
+ }
+
+ // close files
+ for (SlowWriter s : slowwriters) {
+ s.joinAndClose();
+ }
+
+ // Verify the file
+ verifyFileContent(fs, slowwriters);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private MiniDFSCluster setupCluster(int failRF) throws IOException {
+ final Configuration conf = new HdfsConfiguration();
+ conf.setInt(HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.
+ MIN_REPLICATION, failRF);
+ // always replace a datanode
+ ReplaceDatanodeOnFailure.write(Policy.ALWAYS, false, conf);
+
+ final String[] racks = new String[REPLICATION];
+ Arrays.fill(racks, RACK0);
+ return new MiniDFSCluster.Builder(conf).racks(racks)
+ .numDataNodes(REPLICATION).build();
+ }
+
+ private void testWriteFileAndVerifyAfterDNStop(int failRF, int dnindex,
+ int slowWrites, boolean failPipeLine)
+ throws IOException, InterruptedException, TimeoutException {
+ final MiniDFSCluster cluster = setupCluster(failRF);
+ try {
+ final DistributedFileSystem fs = cluster.getFileSystem();
+ final Path dir = new Path(DIR);
+
+ final SlowWriter[] slowwriters = new SlowWriter[slowWrites];
+ for (int i = 1; i <= slowwriters.length; i++) {
+ // create slow writers in different speed
+ slowwriters[i - 1] = new SlowWriter(fs, new Path(dir, "file" + i),
+ i * 200L);
+ }
+
+ for (SlowWriter s : slowwriters) {
+ s.start();
+ }
+
+ // Let slow writers write something.
+ // Some of them are too slow and will be not yet started.
+ sleepSeconds(3);
+
+ // stop an datanode
+ cluster.stopDataNode(dnindex);
+ if (failPipeLine) {
+ cluster.stopDataNode(dnindex);
+ }
+
+ // Let the slow writer writes a few more seconds
+ // Everyone should have written something.
+ sleepSeconds(5);
+ cluster.waitFirstBRCompleted(0, 10000);
+ // check replication and interrupt.
+ for (SlowWriter s : slowwriters) {
+ Assert.assertEquals(failRF, s.out.getCurrentBlockReplication());
+ s.interruptRunning();
+ }
+
+ // close files
+ for (SlowWriter s : slowwriters) {
+ s.joinAndClose();
+ }
+
+ // Verify the file
+ verifyFileContent(fs, slowwriters);
+ } finally {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+ }
+
+ private void verifyFileContent(DistributedFileSystem fs,
+ SlowWriter[] slowwriters) throws IOException {
+ LOG.info("Verify the file");
+ for (int i = 0; i < slowwriters.length; i++) {
+ LOG.info(slowwriters[i].filepath + ": length=" + fs
+ .getFileStatus(slowwriters[i].filepath).getLen());
+ FSDataInputStream in = null;
+ try {
+ in = fs.open(slowwriters[i].filepath);
+ for (int j = 0, x;; j++) {
+ x = in.read();
+ if ((x) != -1) {
+ Assert.assertEquals(j, x);
+ } else {
+ return;
+ }
+ }
+ } finally {
+ IOUtils.closeStream(in);
+ }
+ }
+ }
+
+ static void sleepSeconds(final int waittime) throws InterruptedException {
+ LOG.info("Wait " + waittime + " seconds");
+ Thread.sleep(waittime * 1000L);
+ }
+
+ static class SlowWriter extends Thread {
+ private final Path filepath;
+ private final HdfsDataOutputStream out;
+ private final long sleepms;
+ private volatile boolean running = true;
+
+ SlowWriter(DistributedFileSystem fs, Path filepath, final long sleepms)
+ throws IOException {
+ super(SlowWriter.class.getSimpleName() + ":" + filepath);
+ this.filepath = filepath;
+ this.out = (HdfsDataOutputStream) fs.create(filepath, REPLICATION);
+ this.sleepms = sleepms;
+ }
+
+ @Override public void run() {
+ int i = 0;
+ try {
+ sleep(sleepms);
+ for (; running; i++) {
+ LOG.info(getName() + " writes " + i);
+ out.write(i);
+ out.hflush();
+ sleep(sleepms);
+ }
+ } catch (InterruptedException e) {
+ LOG.info(getName() + " interrupted:" + e);
+ } catch (IOException e) {
+ throw new RuntimeException(getName(), e);
+ } finally {
+ LOG.info(getName() + " terminated: i=" + i);
+ }
+ }
+
+ void interruptRunning() {
+ running = false;
+ interrupt();
+ }
+
+ void joinAndClose() throws InterruptedException {
+ LOG.info(getName() + " join and close");
+ join();
+ IOUtils.closeStream(out);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fda1221c/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
index 233dc5a..47db565 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java
@@ -41,8 +41,8 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase {
public void initializeMemberVariables() {
xmlFilename = new String("hdfs-default.xml");
configurationClasses = new Class[] { HdfsClientConfigKeys.class,
- HdfsClientConfigKeys.StripedRead.class,
- DFSConfigKeys.class};
+ HdfsClientConfigKeys.StripedRead.class, DFSConfigKeys.class,
+ HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.class };
// Set error modes
errorIfMissingConfigProps = true;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org