You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xg...@apache.org on 2016/12/14 21:59:57 UTC

[5/9] hadoop git commit: HDFS-8411. Add bytes count metrics to datanode for ECWorker. Contributed by Sammi Chen and Andrew Wang

HDFS-8411. Add bytes count metrics to datanode for ECWorker. Contributed by Sammi Chen and Andrew Wang


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1f14f6d0
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1f14f6d0
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1f14f6d0

Branch: refs/heads/YARN-5734
Commit: 1f14f6d038aecad55a5398c6fa4137c9d2f44729
Parents: ada876c
Author: Kai Zheng <ka...@intel.com>
Authored: Wed Dec 14 14:50:50 2016 +0800
Committer: Kai Zheng <ka...@intel.com>
Committed: Wed Dec 14 14:50:50 2016 +0800

----------------------------------------------------------------------
 .../erasurecode/StripedBlockReader.java         |   1 +
 .../erasurecode/StripedBlockReconstructor.java  |   6 +-
 .../erasurecode/StripedBlockWriter.java         |   1 +
 .../datanode/erasurecode/StripedReader.java     |   4 +
 .../erasurecode/StripedReconstructor.java       |  21 +++
 .../datanode/erasurecode/StripedWriter.java     |   4 +
 .../datanode/metrics/DataNodeMetrics.java       |  18 ++-
 .../apache/hadoop/hdfs/StripedFileTestUtil.java |  24 +++
 .../TestDataNodeErasureCodingMetrics.java       | 149 +++++++++----------
 9 files changed, 147 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
index a27de9b..0f7c5c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReader.java
@@ -187,6 +187,7 @@ class StripedBlockReader {
         break;
       }
       n += nread;
+      stripedReader.getReconstructor().incrBytesRead(nread);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
index a8e9d30..5554d68 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockReconstructor.java
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
 
 /**
  * StripedBlockReconstructor reconstruct one or more missed striped block in
@@ -66,7 +67,10 @@ class StripedBlockReconstructor extends StripedReconstructor
       getDatanode().getMetrics().incrECFailedReconstructionTasks();
     } finally {
       getDatanode().decrementXmitsInProgress();
-      getDatanode().getMetrics().incrECReconstructionTasks();
+      final DataNodeMetrics metrics = getDatanode().getMetrics();
+      metrics.incrECReconstructionTasks();
+      metrics.incrECReconstructionBytesRead(getBytesRead());
+      metrics.incrECReconstructionBytesWritten(getBytesWritten());
       getStripedReader().close();
       stripedWriter.close();
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
index 592be45..d999202 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java
@@ -196,6 +196,7 @@ class StripedBlockWriter {
       packet.writeTo(targetOutputStream);
 
       blockOffset4Target += toWrite;
+      stripedWriter.getReconstructor().incrBytesWritten(toWrite);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
index 238c628..f6f343a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReader.java
@@ -435,6 +435,10 @@ class StripedReader {
     }
   }
 
+  StripedReconstructor getReconstructor() {
+    return reconstructor;
+  }
+
   StripedBlockReader getReader(int i) {
     return readers.get(i);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
index 5641c35..68769f7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java
@@ -41,6 +41,7 @@ import java.util.BitSet;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.atomic.AtomicLong;
 
 /**
  * StripedReconstructor reconstruct one or more missed striped block in the
@@ -114,6 +115,10 @@ abstract class StripedReconstructor {
   private long maxTargetLength = 0L;
   private final BitSet liveBitSet;
 
+  // metrics
+  private AtomicLong bytesRead = new AtomicLong(0);
+  private AtomicLong bytesWritten = new AtomicLong(0);
+
   StripedReconstructor(ErasureCodingWorker worker,
       StripedReconstructionInfo stripedReconInfo) {
     this.stripedReadPool = worker.getStripedReadPool();
@@ -133,6 +138,22 @@ abstract class StripedReconstructor {
     positionInBlock = 0L;
   }
 
+  public void incrBytesRead(long delta) {
+    bytesRead.addAndGet(delta);
+  }
+
+  public void incrBytesWritten(long delta) {
+    bytesWritten.addAndGet(delta);
+  }
+
+  public long getBytesRead() {
+    return bytesRead.get();
+  }
+
+  public long getBytesWritten() {
+    return bytesWritten.get();
+  }
+
   /**
    * Reconstruct one or more missed striped block in the striped block group,
    * the minimum number of live striped blocks should be no less than data

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
index c099bc1..225a7ed 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedWriter.java
@@ -280,6 +280,10 @@ class StripedWriter {
     return reconstructor.getSocketAddress4Transfer(target);
   }
 
+  StripedReconstructor getReconstructor() {
+    return reconstructor;
+  }
+
   boolean hasValidTargets() {
     return hasValidTargets;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
index 23e15a2..e09a85f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
@@ -18,7 +18,6 @@
 package org.apache.hadoop.hdfs.server.datanode.metrics;
 
 import static org.apache.hadoop.metrics2.impl.MsInfo.SessionId;
-import static org.apache.hadoop.metrics2.lib.Interns.info;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -135,8 +134,12 @@ public class DataNodeMetrics {
   MutableCounterLong ecReconstructionTasks;
   @Metric("Count of erasure coding failed reconstruction tasks")
   MutableCounterLong ecFailedReconstructionTasks;
-  // Nanoseconds spent by decoding tasks.
+  @Metric("Nanoseconds spent by decoding tasks")
   MutableCounterLong ecDecodingTimeNanos;
+  @Metric("Bytes read by erasure coding worker")
+  MutableCounterLong ecReconstructionBytesRead;
+  @Metric("Bytes written by erasure coding worker")
+  MutableCounterLong ecReconstructionBytesWritten;
 
   final MetricsRegistry registry = new MetricsRegistry("datanode");
   final String name;
@@ -156,9 +159,6 @@ public class DataNodeMetrics {
     sendDataPacketTransferNanosQuantiles = new MutableQuantiles[len];
     ramDiskBlocksEvictionWindowMsQuantiles = new MutableQuantiles[len];
     ramDiskBlocksLazyPersistWindowMsQuantiles = new MutableQuantiles[len];
-    ecDecodingTimeNanos = registry.newCounter(
-        info("ecDecodingTimeNanos", "Nanoseconds spent by decoding tasks"),
-        (long) 0);
 
     for (int i = 0; i < len; i++) {
       int interval = intervals[i];
@@ -454,4 +454,12 @@ public class DataNodeMetrics {
   public void incrECDecodingTime(long decodingTimeNanos) {
     ecDecodingTimeNanos.incr(decodingTimeNanos);
   }
+
+  public void incrECReconstructionBytesRead(long bytes) {
+    ecReconstructionBytesRead.incr(bytes);
+  }
+
+  public void incrECReconstructionBytesWritten(long bytes) {
+    ecReconstructionBytesWritten.incr(bytes);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 311ba7c..520d0e3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -527,6 +527,30 @@ public class StripedFileTestUtil {
   }
 
   /**
+   * Wait for the reconstruction to be finished when the file has
+   * corrupted blocks. The function can take care file with any length.
+   */
+  public static void waitForAllReconstructionFinished(Path file,
+      DistributedFileSystem fs, long expectedBlocks) throws Exception {
+    LOG.info("Waiting for reconstruction to be finished for the file:" + file
+        + ", expectedBlocks:" + expectedBlocks);
+    final int attempts = 60;
+    for (int i = 0; i < attempts; i++) {
+      int totalBlocks = 0;
+      LocatedBlocks locatedBlocks = getLocatedBlocks(file, fs);
+      for (LocatedBlock locatedBlock: locatedBlocks.getLocatedBlocks()) {
+        DatanodeInfo[] storageInfos = locatedBlock.getLocations();
+        totalBlocks += storageInfos.length;
+      }
+      if (totalBlocks >= expectedBlocks) {
+        return;
+      }
+      Thread.sleep(1000);
+    }
+    throw new IOException("Time out waiting for EC block reconstruction.");
+  }
+
+  /**
    * Get the located blocks of a file.
    */
   public static LocatedBlocks getLocatedBlocks(Path file,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/1f14f6d0/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
index 1b0526b..64ddbd7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeErasureCodingMetrics.java
@@ -17,7 +17,6 @@
  */
 package org.apache.hadoop.hdfs.server.datanode;
 
-import com.google.common.base.Supplier;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -28,7 +27,6 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.StripedFileTestUtil;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
@@ -38,21 +36,16 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.namenode.ErasureCodingPolicyManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
 import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
-
 import java.io.IOException;
-import java.util.Arrays;
 
 /**
  * This file tests the erasure coding metrics in DataNode.
@@ -65,8 +58,9 @@ public class TestDataNodeErasureCodingMetrics {
   private final int dataBlocks = ecPolicy.getNumDataUnits();
   private final int parityBlocks = ecPolicy.getNumParityUnits();
   private final int cellSize = ecPolicy.getCellSize();
-  private final int blockSize = cellSize;
+  private final int blockSize = cellSize * 2;
   private final int groupSize = dataBlocks + parityBlocks;
+  private final int blockGroupSize = blockSize * dataBlocks;
   private final int numDNs = groupSize + 1;
 
   private MiniDFSCluster cluster;
@@ -76,7 +70,6 @@ public class TestDataNodeErasureCodingMetrics {
   @Before
   public void setup() throws IOException {
     conf = new Configuration();
-
     conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
     conf.setInt(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY, 1);
     cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
@@ -93,82 +86,86 @@ public class TestDataNodeErasureCodingMetrics {
   }
 
   @Test(timeout = 120000)
-  public void testEcTasks() throws Exception {
-    DataNode workerDn = doTest("/testEcTasks");
-    MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
-
-    // Ensure that reconstruction task is finished
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
-        return (taskMetricValue > 0);
-      }
-    }, 500, 10000);
+  public void testFullBlock() throws Exception {
+    doTest("/testEcMetrics", blockGroupSize, 0);
+
+    Assert.assertEquals("EcReconstructionTasks should be ",
+        1, getLongMetric("EcReconstructionTasks"));
+    Assert.assertEquals("EcFailedReconstructionTasks should be ",
+        0, getLongMetric("EcFailedReconstructionTasks"));
+    Assert.assertTrue(getLongMetric("EcDecodingTimeNanos") > 0);
+    Assert.assertEquals("EcReconstructionBytesRead should be ",
+        blockGroupSize, getLongMetric("EcReconstructionBytesRead"));
+    Assert.assertEquals("EcReconstructionBytesWritten should be ",
+        blockSize, getLongMetric("EcReconstructionBytesWritten"));
+  }
 
-    assertCounter("EcReconstructionTasks", (long) 1, rb);
-    assertCounter("EcFailedReconstructionTasks", (long) 0, rb);
+  // A partial block, reconstruct the partial block
+  @Test(timeout = 120000)
+  public void testReconstructionBytesPartialGroup1() throws Exception {
+    final int fileLen = blockSize / 10;
+    doTest("/testEcBytes", fileLen, 0);
+
+    Assert.assertEquals("EcReconstructionBytesRead should be ",
+        fileLen,  getLongMetric("EcReconstructionBytesRead"));
+    Assert.assertEquals("EcReconstructionBytesWritten should be ",
+        fileLen, getLongMetric("EcReconstructionBytesWritten"));
   }
 
+  // 1 full block + 5 partial block, reconstruct the full block
   @Test(timeout = 120000)
-  public void testEcCodingTime() throws Exception {
-    DataNode workerDn = doTest("/testEcCodingTime");
-    MetricsRecordBuilder rb = getMetrics(workerDn.getMetrics().name());
-
-    // Ensure that reconstruction task is finished
-    GenericTestUtils.waitFor(new Supplier<Boolean>() {
-      @Override
-      public Boolean get() {
-        long taskMetricValue = getLongCounter("EcReconstructionTasks", rb);
-        return (taskMetricValue > 0);
-      }
-    }, 500, 10000);
+  public void testReconstructionBytesPartialGroup2() throws Exception {
+    final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
+    doTest("/testEcBytes", fileLen, 0);
+
+    Assert.assertEquals("ecReconstructionBytesRead should be ",
+        cellSize * dataBlocks + cellSize + cellSize / 10,
+        getLongMetric("EcReconstructionBytesRead"));
+    Assert.assertEquals("ecReconstructionBytesWritten should be ",
+        blockSize, getLongMetric("EcReconstructionBytesWritten"));
+  }
 
-    long decodeTime = getLongCounter("ecDecodingTimeNanos", rb);
-    Assert.assertTrue(decodeTime > 0);
+  // 1 full block + 5 partial block, reconstruct the partial block
+  @Test(timeout = 120000)
+  public void testReconstructionBytesPartialGroup3() throws Exception {
+    final int fileLen = cellSize * dataBlocks + cellSize + cellSize / 10;
+    doTest("/testEcBytes", fileLen, 1);
+
+    Assert.assertEquals("ecReconstructionBytesRead should be ",
+        cellSize * dataBlocks + (cellSize / 10) * 2 ,
+        getLongMetric("EcReconstructionBytesRead"));
+    Assert.assertEquals("ecReconstructionBytesWritten should be ",
+        cellSize + cellSize / 10,
+        getLongMetric("EcReconstructionBytesWritten"));
   }
 
-  private DataNode doTest(String fileName) throws Exception {
+  private long getLongMetric(String metricName) {
+    long metricValue = 0;
+    // Add all reconstruction metric value from all data nodes
+    for (DataNode dn : cluster.getDataNodes()) {
+      MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
+      metricValue += getLongCounter(metricName, rb);
+    }
+    return metricValue;
+  }
 
+  private void doTest(String fileName, int fileLen,
+      int deadNodeIndex) throws Exception {
+    assertTrue(fileLen > 0);
+    assertTrue(deadNodeIndex >= 0 && deadNodeIndex < numDNs);
     Path file = new Path(fileName);
-    long fileLen = dataBlocks * blockSize;
-    final byte[] data = StripedFileTestUtil.generateBytes((int) fileLen);
+    final byte[] data = StripedFileTestUtil.generateBytes(fileLen);
     DFSTestUtil.writeFile(fs, file, data);
     StripedFileTestUtil.waitBlockGroupsReported(fs, fileName);
 
-    LocatedBlocks locatedBlocks =
+    final LocatedBlocks locatedBlocks =
         StripedFileTestUtil.getLocatedBlocks(file, fs);
-    //only one block group
-    LocatedStripedBlock lastBlock =
+    final LocatedStripedBlock lastBlock =
         (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
-    DataNode workerDn = null;
-    DatanodeInfo[] locations = lastBlock.getLocations();
-    assertEquals(locations.length, groupSize);
-
-    // we have ONE extra datanode in addition to the GROUPSIZE datanodes, here
-    // is to find the extra datanode that the reconstruction task will run on,
-    // according to the current block placement logic for striped files.
-    // This can be improved later to be flexible regardless wherever the task
-    // runs.
-    for (DataNode dn : cluster.getDataNodes()) {
-      boolean appear = false;
-      for (DatanodeInfo info : locations) {
-        if (dn.getDatanodeUuid().equals(info.getDatanodeUuid())) {
-          appear = true;
-          break;
-        }
-      }
-      if (!appear) {
-        workerDn = dn;
-        break;
-      }
-    }
-    // Get a datanode from the block locations.
-    LOG.info("Block locations: " + Arrays.asList(locations));
-    LOG.info("Erasure coding worker datanode: " + workerDn);
-    assertNotNull("Failed to find a worker datanode", workerDn);
+    assertTrue(lastBlock.getLocations().length > deadNodeIndex);
 
-    DataNode toCorruptDn = cluster.getDataNode(locations[0].getIpcPort());
+    final DataNode toCorruptDn = cluster.getDataNode(
+        lastBlock.getLocations()[deadNodeIndex].getIpcPort());
     LOG.info("Datanode to be corrupted: " + toCorruptDn);
     assertNotNull("Failed to find a datanode to be corrupted", toCorruptDn);
     toCorruptDn.shutdown();
@@ -176,12 +173,15 @@ public class TestDataNodeErasureCodingMetrics {
     DFSTestUtil.waitForDatanodeState(cluster, toCorruptDn.getDatanodeUuid(),
         false, 10000);
 
-    int workCount = getComputedDatanodeWork();
+    final int workCount = getComputedDatanodeWork();
     assertTrue("Wrongly computed block reconstruction work", workCount > 0);
     cluster.triggerHeartbeats();
-    StripedFileTestUtil.waitForReconstructionFinished(file, fs, groupSize);
-
-    return workerDn;
+    int totalBlocks =  (fileLen / blockGroupSize) * groupSize;
+    final int remainder = fileLen % blockGroupSize;
+    totalBlocks += (remainder == 0) ? 0 :
+        (remainder % blockSize == 0) ? remainder / blockSize + parityBlocks :
+            remainder / blockSize + 1 + parityBlocks;
+    StripedFileTestUtil.waitForAllReconstructionFinished(file, fs, totalBlocks);
   }
 
   private int getComputedDatanodeWork()
@@ -209,5 +209,4 @@ public class TestDataNodeErasureCodingMetrics {
     BlockManagerTestUtil.checkHeartbeat(
         cluster.getNamesystem().getBlockManager());
   }
-
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org