You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2016/12/20 21:18:01 UTC

hadoop git commit: HDFS-10913. Introduce fault injectors to simulate slow mirrors. Contributed by Xiaobing Zhou.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 4af66b1d6 -> 5daa8d863


HDFS-10913. Introduce fault injectors to simulate slow mirrors. Contributed by Xiaobing Zhou.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5daa8d86
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5daa8d86
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5daa8d86

Branch: refs/heads/trunk
Commit: 5daa8d8631835de97d4e4979e507a080017ca159
Parents: 4af66b1
Author: Xiaoyu Yao <xy...@apache.org>
Authored: Tue Dec 20 13:04:03 2016 -0800
Committer: Xiaoyu Yao <xy...@apache.org>
Committed: Tue Dec 20 13:17:52 2016 -0800

----------------------------------------------------------------------
 .../hdfs/server/datanode/BlockReceiver.java     |  10 +-
 .../server/datanode/DataNodeFaultInjector.java  |  25 ++-
 .../TestClientProtocolForPipelineRecovery.java  |   3 +-
 .../datanode/TestDataNodeFaultInjector.java     | 173 +++++++++++++++++++
 4 files changed, 208 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/5daa8d86/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 441bd91..23cd44d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -566,12 +566,15 @@ class BlockReceiver implements Closeable {
       try {
         long begin = Time.monotonicNow();
         // For testing. Normally no-op.
-        DataNodeFaultInjector.get().stopSendingPacketDownstream();
+        DataNodeFaultInjector.get().stopSendingPacketDownstream(mirrorAddr);
         packetReceiver.mirrorPacketTo(mirrorOut);
         mirrorOut.flush();
         long now = Time.monotonicNow();
         setLastSentTime(now);
         long duration = now - begin;
+        DataNodeFaultInjector.get().logDelaySendingPacketDownstream(
+            mirrorAddr,
+            duration);
         if (duration > datanodeSlowLogThresholdMs) {
           LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
               + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms)");
@@ -1534,9 +1537,14 @@ class BlockReceiver implements Closeable {
       }
       // send my ack back to upstream datanode
       long begin = Time.monotonicNow();
+      /* for test only, no-op in production system */
+      DataNodeFaultInjector.get().delaySendingAckToUpstream(inAddr);
       replyAck.write(upstreamOut);
       upstreamOut.flush();
       long duration = Time.monotonicNow() - begin;
+      DataNodeFaultInjector.get().logDelaySendingAckToUpstream(
+          inAddr,
+          duration);
       if (duration > datanodeSlowLogThresholdMs) {
         LOG.warn("Slow PacketResponder send ack to upstream took " + duration
             + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5daa8d86/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
index aa06aa1..b74d2c9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNodeFaultInjector.java
@@ -50,7 +50,30 @@ public class DataNodeFaultInjector {
     return false;
   }
 
-  public void stopSendingPacketDownstream() throws IOException {}
+  public void stopSendingPacketDownstream(final String mirrAddr)
+      throws IOException {
+  }
+
+  /**
+   * Used as a hook to intercept the latency of sending packet.
+   */
+  public void logDelaySendingPacketDownstream(
+      final String mirrAddr,
+      final long delayMs) throws IOException {
+  }
+
+  public void delaySendingAckToUpstream(final String upstreamAddr)
+      throws IOException {
+  }
+
+  /**
+   * Used as a hook to intercept the latency of sending ack.
+   */
+  public void logDelaySendingAckToUpstream(
+      final String upstreamAddr,
+      final long delayMs)
+      throws IOException {
+  }
 
   public void noRegistration() throws IOException { }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5daa8d86/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
index 65a484c..1a640b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestClientProtocolForPipelineRecovery.java
@@ -453,7 +453,8 @@ public class TestClientProtocolForPipelineRecovery {
     DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
       int tries = 1;
       @Override
-      public void stopSendingPacketDownstream() throws IOException {
+      public void stopSendingPacketDownstream(final String mirrAddr)
+          throws IOException {
         if (tries > 0) {
           tries--;
           try {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/5daa8d86/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFaultInjector.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFaultInjector.java
new file mode 100644
index 0000000..fe65429
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeFaultInjector.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.test.PathUtils;
+import org.junit.Test;
+
+/**
+ * This class tests various cases where faults are injected to DataNode.
+ */
+public class TestDataNodeFaultInjector {
+  private static final Log LOG = LogFactory
+      .getLog(TestDataNodeFaultInjector.class);
+
+  private static class MetricsDataNodeFaultInjector
+      extends DataNodeFaultInjector {
+
+    public static final long DELAY = 2000;
+    private long delayMs = 0;
+    private final String err = "Interrupted while sleeping. Bailing out.";
+    private long delayTries = 1;
+
+    void delayOnce() throws IOException {
+      if (delayTries > 0) {
+        delayTries--;
+        try {
+          Thread.sleep(DELAY);
+        } catch (InterruptedException ie) {
+          throw new IOException(err);
+        }
+      }
+    }
+
+    long getDelayMs() {
+      return delayMs;
+    }
+
+    void logDelay(final long duration) {
+      /**
+       * delay should be at least longer than DELAY, otherwise, delayXYZ is
+       * no-op
+       */
+      if (duration >= DELAY) {
+        this.delayMs = duration;
+      }
+    }
+  }
+
+  @Test(timeout = 60000)
+  public void testDelaySendingAckToUpstream() throws Exception {
+    final MetricsDataNodeFaultInjector mdnFaultInjector =
+        new MetricsDataNodeFaultInjector() {
+          @Override
+          public void delaySendingAckToUpstream(final String upstreamAddr)
+              throws IOException {
+            delayOnce();
+          }
+
+          @Override
+          public void logDelaySendingAckToUpstream(final String upstreamAddr,
+              final long delayMs) throws IOException {
+            logDelay(delayMs);
+          }
+        };
+    verifyFaultInjectionDelayPipeline(mdnFaultInjector);
+  }
+
+  @Test(timeout = 60000)
+  public void testDelaySendingPacketDownstream() throws Exception {
+    final MetricsDataNodeFaultInjector mdnFaultInjector =
+        new MetricsDataNodeFaultInjector() {
+          @Override
+          public void stopSendingPacketDownstream(final String mirrAddr)
+              throws IOException {
+            delayOnce();
+          }
+
+          @Override
+          public void logDelaySendingPacketDownstream(final String mirrAddr,
+              final long delayMs) throws IOException {
+            logDelay(delayMs);
+          }
+        };
+    verifyFaultInjectionDelayPipeline(mdnFaultInjector);
+  }
+
+  private void verifyFaultInjectionDelayPipeline(
+      final MetricsDataNodeFaultInjector mdnFaultInjector) throws Exception {
+
+    final Path baseDir = new Path(
+        PathUtils.getTestDir(getClass()).getAbsolutePath(),
+        GenericTestUtils.getMethodName());
+    final DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
+    DataNodeFaultInjector.set(mdnFaultInjector);
+
+    final Configuration conf = new HdfsConfiguration();
+
+    /*
+     * MetricsDataNodeFaultInjector.DELAY/2 ms is viewed as slow.
+     */
+    final long datanodeSlowLogThresholdMs = MetricsDataNodeFaultInjector.DELAY
+        / 2;
+    conf.setLong(DFSConfigKeys.DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_KEY,
+        datanodeSlowLogThresholdMs);
+    conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.toString());
+
+    /**
+     * configure to avoid resulting in pipeline failure due to read socket
+     * timeout
+     */
+    conf.setLong(HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY,
+        MetricsDataNodeFaultInjector.DELAY * 2);
+    conf.setBoolean(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.ENABLE_KEY,
+        true);
+    conf.set(
+        HdfsClientConfigKeys.BlockWrite.ReplaceDatanodeOnFailure.POLICY_KEY,
+        "ALWAYS");
+
+    MiniDFSCluster cluster = null;
+    try {
+      cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
+      cluster.waitActive();
+
+      final FileSystem fs = cluster.getFileSystem();
+      try (final FSDataOutputStream out = fs
+          .create(new Path(baseDir, "test.data"), (short) 2)) {
+        out.write(0x31);
+        out.hflush();
+        out.hsync();
+      }
+      LOG.info("delay info: " + mdnFaultInjector.getDelayMs() + ":"
+          + datanodeSlowLogThresholdMs);
+      assertTrue("Injected delay should be longer than the configured one",
+          mdnFaultInjector.getDelayMs() > datanodeSlowLogThresholdMs);
+    } finally {
+      if (cluster != null) {
+        cluster.shutdown();
+      }
+      DataNodeFaultInjector.set(oldDnInjector);
+    }
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org