You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ae...@apache.org on 2017/09/07 21:36:26 UTC
[13/37] hadoop git commit: HDFS-12377. Refactor
TestReadStripedFileWithDecoding to avoid test timeouts.
HDFS-12377. Refactor TestReadStripedFileWithDecoding to avoid test timeouts.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d4035d42
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d4035d42
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d4035d42
Branch: refs/heads/HDFS-7240
Commit: d4035d42f02507bc89adce3f0450c36b58b201c1
Parents: ad32759
Author: Andrew Wang <wa...@apache.org>
Authored: Tue Sep 5 16:33:29 2017 -0700
Committer: Andrew Wang <wa...@apache.org>
Committed: Tue Sep 5 16:33:29 2017 -0700
----------------------------------------------------------------------
.../hdfs/ReadStripedFileWithDecodingHelper.java | 273 ++++++++++++++++
.../apache/hadoop/hdfs/StripedFileTestUtil.java | 9 +-
.../hdfs/TestReadStripedFileWithDNFailure.java | 107 +++++++
.../hdfs/TestReadStripedFileWithDecoding.java | 309 +++----------------
...tReadStripedFileWithDecodingCorruptData.java | 87 ++++++
...tReadStripedFileWithDecodingDeletedData.java | 88 ++++++
6 files changed, 596 insertions(+), 277 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4035d42/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java
new file mode 100644
index 0000000..4202969
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/ReadStripedFileWithDecodingHelper.java
@@ -0,0 +1,273 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.util.StripedBlockUtil;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.event.Level;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * Utility class for testing online recovery of striped files.
+ */
+abstract public class ReadStripedFileWithDecodingHelper {
+ static final Logger LOG =
+ LoggerFactory.getLogger(ReadStripedFileWithDecodingHelper.class);
+
+ static {
+ ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
+ .getLogger().setLevel(org.apache.log4j.Level.ALL);
+ GenericTestUtils.setLogLevel(BlockManager.LOG, Level.DEBUG);
+ GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.DEBUG);
+ GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.DEBUG);
+ }
+
+ protected static final ErasureCodingPolicy EC_POLICY =
+ StripedFileTestUtil.getDefaultECPolicy();
+ protected static final short NUM_DATA_UNITS =
+ (short) EC_POLICY.getNumDataUnits();
+ protected static final short NUM_PARITY_UNITS =
+ (short) EC_POLICY.getNumParityUnits();
+ protected static final int CELL_SIZE = EC_POLICY.getCellSize();
+ private static final int STRIPES_PER_BLOCK = 4;
+ protected static final int BLOCK_SIZE = CELL_SIZE * STRIPES_PER_BLOCK;
+ private static final int BLOCK_GROUP_SIZE = BLOCK_SIZE * NUM_DATA_UNITS;
+
+ private static final int NUM_DATANODES = NUM_DATA_UNITS + NUM_PARITY_UNITS;
+
+ protected static final int[] FILE_LENGTHS =
+ {BLOCK_GROUP_SIZE - 123, BLOCK_GROUP_SIZE + 123};
+
+ public static MiniDFSCluster initializeCluster() throws IOException {
+ Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
+ 0);
+ conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
+ false);
+ conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
+ StripedFileTestUtil.getDefaultECPolicy().getName());
+ MiniDFSCluster myCluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(NUM_DATANODES)
+ .build();
+ myCluster.getFileSystem().getClient().setErasureCodingPolicy("/",
+ StripedFileTestUtil.getDefaultECPolicy().getName());
+ return myCluster;
+ }
+
+ public static void tearDownCluster(MiniDFSCluster cluster)
+ throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ public static int findFirstDataNode(MiniDFSCluster cluster,
+ DistributedFileSystem dfs, Path file, long length) throws IOException {
+ BlockLocation[] locs = dfs.getFileBlockLocations(file, 0, length);
+ String name = (locs[0].getNames())[0];
+ int dnIndex = 0;
+ for (DataNode dn : cluster.getDataNodes()) {
+ int port = dn.getXferPort();
+ if (name.contains(Integer.toString(port))) {
+ return dnIndex;
+ }
+ dnIndex++;
+ }
+ return -1;
+ }
+
+ /**
+ * Cross product of FILE_LENGTHS, NUM_PARITY_UNITS+1, NUM_PARITY_UNITS.
+ * Input for parameterized tests classes.
+ *
+ * @return Test parameters.
+ */
+ public static Collection<Object[]> getParameters() {
+ ArrayList<Object[]> params = new ArrayList<>();
+ for (int fileLength : FILE_LENGTHS) {
+ for (int dataDelNum = 1; dataDelNum <= NUM_PARITY_UNITS; dataDelNum++) {
+ for (int parityDelNum = 0;
+ (dataDelNum + parityDelNum) <= NUM_PARITY_UNITS; parityDelNum++) {
+ params.add(new Object[] {fileLength, dataDelNum, parityDelNum});
+ }
+ }
+ }
+ return params;
+ }
+
+ public static void verifyRead(DistributedFileSystem dfs, Path testPath,
+ int length, byte[] expected) throws IOException {
+ LOG.info("verifyRead on path {}", testPath);
+ byte[] buffer = new byte[length + 100];
+ LOG.info("verifyRead verifyLength on path {}", testPath);
+ StripedFileTestUtil.verifyLength(dfs, testPath, length);
+ LOG.info("verifyRead verifyPread on path {}", testPath);
+ StripedFileTestUtil.verifyPread(dfs, testPath, length, expected, buffer);
+ LOG.info("verifyRead verifyStatefulRead on path {}", testPath);
+ StripedFileTestUtil.verifyStatefulRead(dfs, testPath, length, expected,
+ buffer);
+ LOG.info("verifyRead verifyStatefulRead2 on path {}", testPath);
+ StripedFileTestUtil.verifyStatefulRead(dfs, testPath, length, expected,
+ ByteBuffer.allocate(length + 100));
+ LOG.info("verifyRead verifySeek on path {}", testPath);
+ StripedFileTestUtil.verifySeek(dfs, testPath, length, EC_POLICY,
+ BLOCK_GROUP_SIZE);
+ }
+
+ public static void testReadWithDNFailure(MiniDFSCluster cluster,
+ DistributedFileSystem dfs, int fileLength, int dnFailureNum)
+ throws Exception {
+ String fileType = fileLength < (BLOCK_SIZE * NUM_DATA_UNITS) ?
+ "smallFile" : "largeFile";
+ String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
+ LOG.info("testReadWithDNFailure: file = " + src
+ + ", fileSize = " + fileLength
+ + ", dnFailureNum = " + dnFailureNum);
+
+ Path testPath = new Path(src);
+ final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
+ DFSTestUtil.writeFile(dfs, testPath, bytes);
+ StripedFileTestUtil.waitBlockGroupsReported(dfs, src);
+
+ // shut down the DN that holds an internal data block
+ BlockLocation[] locs = dfs.getFileBlockLocations(testPath, CELL_SIZE * 5,
+ CELL_SIZE);
+ for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
+ String name = (locs[0].getNames())[failedDnIdx];
+ for (DataNode dn : cluster.getDataNodes()) {
+ int port = dn.getXferPort();
+ if (name.contains(Integer.toString(port))) {
+ dn.shutdown();
+ }
+ }
+ }
+
+ // check file length, pread, stateful read and seek
+ verifyRead(dfs, testPath, fileLength, bytes);
+ }
+
+
+ /**
+ * Test reading a file with some blocks(data blocks or parity blocks or both)
+ * deleted or corrupted.
+ * @param src file path
+ * @param fileNumBytes file length
+ * @param dataBlkDelNum the deleted or corrupted number of data blocks.
+ * @param parityBlkDelNum the deleted or corrupted number of parity blocks.
+ * @param deleteBlockFile whether block file is deleted or corrupted.
+ * true is to delete the block file.
+ * false is to corrupt the content of the block file.
+ * @throws IOException
+ */
+ public static void testReadWithBlockCorrupted(MiniDFSCluster cluster,
+ DistributedFileSystem dfs, String src, int fileNumBytes,
+ int dataBlkDelNum, int parityBlkDelNum,
+ boolean deleteBlockFile) throws IOException {
+ LOG.info("testReadWithBlockCorrupted: file = " + src
+ + ", dataBlkDelNum = " + dataBlkDelNum
+ + ", parityBlkDelNum = " + parityBlkDelNum
+ + ", deleteBlockFile? " + deleteBlockFile);
+ int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
+ Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
+ dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
+ Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
+ "should be between 1 ~ " + NUM_PARITY_UNITS, recoverBlkNum <=
+ NUM_PARITY_UNITS);
+
+ // write a file with the length of writeLen
+ Path srcPath = new Path(src);
+ final byte[] bytes = StripedFileTestUtil.generateBytes(fileNumBytes);
+ DFSTestUtil.writeFile(dfs, srcPath, bytes);
+
+ // delete or corrupt some blocks
+ corruptBlocks(cluster, dfs, srcPath, dataBlkDelNum, parityBlkDelNum,
+ deleteBlockFile);
+
+ // check the file can be read after some blocks were deleted
+ verifyRead(dfs, srcPath, fileNumBytes, bytes);
+ }
+
+ public static void corruptBlocks(MiniDFSCluster cluster,
+ DistributedFileSystem dfs, Path srcPath,
+ int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
+ throws IOException {
+ LOG.info("corruptBlocks on path {}", srcPath);
+ int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
+
+ LocatedBlocks locatedBlocks = getLocatedBlocks(dfs, srcPath);
+ LocatedStripedBlock lastBlock =
+ (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
+
+ int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, NUM_DATA_UNITS,
+ dataBlkDelNum);
+ Assert.assertNotNull(delDataBlkIndices);
+ int[] delParityBlkIndices = StripedFileTestUtil.randomArray(NUM_DATA_UNITS,
+ NUM_DATA_UNITS + NUM_PARITY_UNITS, parityBlkDelNum);
+ Assert.assertNotNull(delParityBlkIndices);
+
+ int[] delBlkIndices = new int[recoverBlkNum];
+ System.arraycopy(delDataBlkIndices, 0,
+ delBlkIndices, 0, delDataBlkIndices.length);
+ System.arraycopy(delParityBlkIndices, 0,
+ delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
+
+ ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
+ for (int i = 0; i < recoverBlkNum; i++) {
+ delBlocks[i] = StripedBlockUtil
+ .constructInternalBlock(lastBlock.getBlock(),
+ CELL_SIZE, NUM_DATA_UNITS, delBlkIndices[i]);
+ if (deleteBlockFile) {
+ // delete the block file
+ LOG.info("Deleting block file {}", delBlocks[i]);
+ cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
+ } else {
+ // corrupt the block file
+ LOG.info("Corrupting block file {}", delBlocks[i]);
+ cluster.corruptBlockOnDataNodes(delBlocks[i]);
+ }
+ }
+ }
+
+ public static LocatedBlocks getLocatedBlocks(DistributedFileSystem dfs,
+ Path filePath) throws IOException {
+ return dfs.getClient().getLocatedBlocks(filePath.toString(),
+ 0, Long.MAX_VALUE);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4035d42/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
index 057e94a..1489e48 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/StripedFileTestUtil.java
@@ -18,8 +18,6 @@
package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -27,12 +25,12 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.impl.BlockReaderTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
+import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem.WebHdfsInputStream;
import org.apache.hadoop.io.IOUtils;
@@ -40,6 +38,8 @@ import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.EOFException;
import java.io.IOException;
@@ -57,7 +57,8 @@ import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.Assert.assertEquals;
public class StripedFileTestUtil {
- public static final Log LOG = LogFactory.getLog(StripedFileTestUtil.class);
+ public static final Logger LOG =
+ LoggerFactory.getLogger(StripedFileTestUtil.class);
public static byte[] generateBytes(int cnt) {
byte[] bytes = new byte[cnt];
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4035d42/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java
new file mode 100644
index 0000000..40ac206
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDNFailure.java
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.BLOCK_SIZE;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.FILE_LENGTHS;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
+
+/**
+ * Test online recovery with failed DNs. This test is parameterized.
+ */
+@RunWith(Parameterized.class)
+public class TestReadStripedFileWithDNFailure {
+ static final Logger LOG =
+ LoggerFactory.getLogger(TestReadStripedFileWithDNFailure.class);
+
+ private static MiniDFSCluster cluster;
+ private static DistributedFileSystem dfs;
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(300000);
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ cluster = initializeCluster();
+ dfs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ tearDownCluster(cluster);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getParameters() {
+ ArrayList<Object[]> params = new ArrayList<>();
+ for (int fileLength : FILE_LENGTHS) {
+ for (int i = 0; i < NUM_PARITY_UNITS; i++) {
+ params.add(new Object[] {fileLength, i+1});
+ }
+ }
+ return params;
+ }
+
+ private int fileLength;
+ private int dnFailureNum;
+
+ public TestReadStripedFileWithDNFailure(int fileLength, int dnFailureNum) {
+ this.fileLength = fileLength;
+ this.dnFailureNum = dnFailureNum;
+ }
+
+ /**
+ * Shutdown tolerable number of Datanode before reading.
+ * Verify the decoding works correctly.
+ */
+ @Test
+ public void testReadWithDNFailure() throws Exception {
+ try {
+ // setup a new cluster with no dead datanode
+ setup();
+ ReadStripedFileWithDecodingHelper.testReadWithDNFailure(cluster,
+ dfs, fileLength, dnFailureNum);
+ } catch (IOException ioe) {
+ String fileType = fileLength < (BLOCK_SIZE * NUM_DATA_UNITS) ?
+ "smallFile" : "largeFile";
+ LOG.error("Failed to read file with DN failure:"
+ + " fileType = " + fileType
+ + ", dnFailureNum = " + dnFailureNum);
+ } finally {
+ // tear down the cluster
+ tearDown();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4035d42/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
index cb1640c..2fb9212 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecoding.java
@@ -17,222 +17,58 @@
*/
package org.apache.hadoop.hdfs;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.LocatedStripedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
-import org.junit.Test;
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-public class TestReadStripedFileWithDecoding {
- static final Log LOG = LogFactory.getLog(TestReadStripedFileWithDecoding.class);
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.CELL_SIZE;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_DATA_UNITS;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.NUM_PARITY_UNITS;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.findFirstDataNode;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
- static {
- ((Log4JLogger)LogFactory.getLog(BlockPlacementPolicy.class))
- .getLogger().setLevel(Level.ALL);
- GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
- GenericTestUtils.setLogLevel(BlockManager.blockLog, Level.ALL);
- GenericTestUtils.setLogLevel(NameNode.stateChangeLog, Level.ALL);
- }
+public class TestReadStripedFileWithDecoding {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestReadStripedFileWithDecoding.class);
private MiniDFSCluster cluster;
- private DistributedFileSystem fs;
- private final ErasureCodingPolicy ecPolicy =
- StripedFileTestUtil.getDefaultECPolicy();
- private final short dataBlocks = (short) ecPolicy.getNumDataUnits();
- private final short parityBlocks =
- (short) ecPolicy.getNumParityUnits();
- private final int numDNs = dataBlocks + parityBlocks;
- private final int cellSize = ecPolicy.getCellSize();
- private final int stripPerBlock = 4;
- private final int blockSize = cellSize * stripPerBlock;
- private final int blockGroupSize = blockSize * dataBlocks;
- private final int smallFileLength = blockGroupSize - 123;
- private final int largeFileLength = blockGroupSize + 123;
- private final int[] fileLengths = {smallFileLength, largeFileLength};
- private final int[] dnFailureNums = getDnFailureNums();
-
- private int[] getDnFailureNums() {
- int[] dnFailureNums = new int[parityBlocks];
- for (int i = 0; i < dnFailureNums.length; i++) {
- dnFailureNums[i] = i + 1;
- }
- return dnFailureNums;
- }
+ private DistributedFileSystem dfs;
@Rule
public Timeout globalTimeout = new Timeout(300000);
@Before
public void setup() throws IOException {
- Configuration conf = new HdfsConfiguration();
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY, 0);
- conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY, 0);
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
- false);
- conf.set(DFSConfigKeys.DFS_NAMENODE_EC_POLICIES_ENABLED_KEY,
- StripedFileTestUtil.getDefaultECPolicy().getName());
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDNs).build();
- cluster.getFileSystem().getClient().setErasureCodingPolicy("/",
- StripedFileTestUtil.getDefaultECPolicy().getName());
- fs = cluster.getFileSystem();
+ cluster = initializeCluster();
+ dfs = cluster.getFileSystem();
}
@After
public void tearDown() throws IOException {
- if (cluster != null) {
- cluster.shutdown();
- cluster = null;
- }
- }
-
- /**
- * Shutdown tolerable number of Datanode before reading.
- * Verify the decoding works correctly.
- */
- @Test(timeout=300000)
- public void testReadWithDNFailure() throws Exception {
- for (int fileLength : fileLengths) {
- for (int dnFailureNum : dnFailureNums) {
- try {
- // setup a new cluster with no dead datanode
- setup();
- testReadWithDNFailure(fileLength, dnFailureNum);
- } catch (IOException ioe) {
- String fileType = fileLength < (blockSize * dataBlocks) ?
- "smallFile" : "largeFile";
- LOG.error("Failed to read file with DN failure:"
- + " fileType = "+ fileType
- + ", dnFailureNum = " + dnFailureNum);
- } finally {
- // tear down the cluster
- tearDown();
- }
- }
- }
- }
-
- /**
- * Corrupt tolerable number of block before reading.
- * Verify the decoding works correctly.
- */
- @Test(timeout=300000)
- public void testReadCorruptedData() throws IOException {
- for (int fileLength : fileLengths) {
- for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) {
- for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks;
- parityDelNum++) {
- String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
- testReadWithBlockCorrupted(src, fileLength,
- dataDelNum, parityDelNum, false);
- }
- }
- }
- }
-
- /**
- * Delete tolerable number of block before reading.
- * Verify the decoding works correctly.
- */
- @Test(timeout=300000)
- public void testReadCorruptedDataByDeleting() throws IOException {
- for (int fileLength : fileLengths) {
- for (int dataDelNum = 1; dataDelNum <= parityBlocks; dataDelNum++) {
- for (int parityDelNum = 0; (dataDelNum + parityDelNum) <= parityBlocks;
- parityDelNum++) {
- String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
- testReadWithBlockCorrupted(src, fileLength,
- dataDelNum, parityDelNum, true);
- }
- }
- }
- }
-
- private int findFirstDataNode(Path file, long length) throws IOException {
- BlockLocation[] locs = fs.getFileBlockLocations(file, 0, length);
- String name = (locs[0].getNames())[0];
- int dnIndex = 0;
- for (DataNode dn : cluster.getDataNodes()) {
- int port = dn.getXferPort();
- if (name.contains(Integer.toString(port))) {
- return dnIndex;
- }
- dnIndex++;
- }
- return -1;
- }
-
- private void verifyRead(Path testPath, int length, byte[] expected)
- throws IOException {
- byte[] buffer = new byte[length + 100];
- StripedFileTestUtil.verifyLength(fs, testPath, length);
- StripedFileTestUtil.verifyPread(fs, testPath, length, expected, buffer);
- StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected, buffer);
- StripedFileTestUtil.verifyStatefulRead(fs, testPath, length, expected,
- ByteBuffer.allocate(length + 100));
- StripedFileTestUtil.verifySeek(fs, testPath, length, ecPolicy,
- blockGroupSize);
- }
-
- private void testReadWithDNFailure(int fileLength, int dnFailureNum)
- throws Exception {
- String fileType = fileLength < (blockSize * dataBlocks) ?
- "smallFile" : "largeFile";
- String src = "/dnFailure_" + dnFailureNum + "_" + fileType;
- LOG.info("testReadWithDNFailure: file = " + src
- + ", fileSize = " + fileLength
- + ", dnFailureNum = " + dnFailureNum);
-
- Path testPath = new Path(src);
- final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
- DFSTestUtil.writeFile(fs, testPath, bytes);
- StripedFileTestUtil.waitBlockGroupsReported(fs, src);
-
- // shut down the DN that holds an internal data block
- BlockLocation[] locs = fs.getFileBlockLocations(testPath, cellSize * 5,
- cellSize);
- for (int failedDnIdx = 0; failedDnIdx < dnFailureNum; failedDnIdx++) {
- String name = (locs[0].getNames())[failedDnIdx];
- for (DataNode dn : cluster.getDataNodes()) {
- int port = dn.getXferPort();
- if (name.contains(Integer.toString(port))) {
- dn.shutdown();
- }
- }
- }
-
- // check file length, pread, stateful read and seek
- verifyRead(testPath, fileLength, bytes);
+ tearDownCluster(cluster);
}
/**
@@ -245,15 +81,17 @@ public class TestReadStripedFileWithDecoding {
final Path file = new Path("/corrupted");
final int length = 10; // length of "corruption"
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
- DFSTestUtil.writeFile(fs, file, bytes);
+ DFSTestUtil.writeFile(dfs, file, bytes);
// corrupt the first data block
- int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
+ int dnIndex = ReadStripedFileWithDecodingHelper.findFirstDataNode(
+ cluster, dfs, file, CELL_SIZE * NUM_DATA_UNITS);
Assert.assertNotEquals(-1, dnIndex);
- LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
- .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
+ LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
+ .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
+ .get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
- cellSize, dataBlocks, parityBlocks);
+ CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
// find the first block file
File storageDir = cluster.getInstanceStorageDir(dnIndex, 0);
File blkFile = MiniDFSCluster.getBlockFile(storageDir, blks[0].getBlock());
@@ -272,7 +110,7 @@ public class TestReadStripedFileWithDecoding {
try {
// do stateful read
- StripedFileTestUtil.verifyStatefulRead(fs, file, length, bytes,
+ StripedFileTestUtil.verifyStatefulRead(dfs, file, length, bytes,
ByteBuffer.allocate(1024));
// check whether the corruption has been reported to the NameNode
@@ -293,110 +131,35 @@ public class TestReadStripedFileWithDecoding {
final Path file = new Path("/invalidate");
final int length = 10;
final byte[] bytes = StripedFileTestUtil.generateBytes(length);
- DFSTestUtil.writeFile(fs, file, bytes);
+ DFSTestUtil.writeFile(dfs, file, bytes);
- int dnIndex = findFirstDataNode(file, cellSize * dataBlocks);
+ int dnIndex = findFirstDataNode(cluster, dfs, file,
+ CELL_SIZE * NUM_DATA_UNITS);
Assert.assertNotEquals(-1, dnIndex);
- LocatedStripedBlock slb = (LocatedStripedBlock)fs.getClient()
- .getLocatedBlocks(file.toString(), 0, cellSize * dataBlocks).get(0);
+ LocatedStripedBlock slb = (LocatedStripedBlock) dfs.getClient()
+ .getLocatedBlocks(file.toString(), 0, CELL_SIZE * NUM_DATA_UNITS)
+ .get(0);
final LocatedBlock[] blks = StripedBlockUtil.parseStripedBlockGroup(slb,
- cellSize, dataBlocks, parityBlocks);
+ CELL_SIZE, NUM_DATA_UNITS, NUM_PARITY_UNITS);
final Block b = blks[0].getBlock().getLocalBlock();
DataNode dn = cluster.getDataNodes().get(dnIndex);
- // disable the heartbeat from DN so that the invalidated block record is kept
- // in NameNode until heartbeat expires and NN mark the dn as dead
+ // disable the heartbeat from DN so that the invalidated block record is
+ // kept in NameNode until heartbeat expires and NN mark the dn as dead
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
try {
// delete the file
- fs.delete(file, true);
+ dfs.delete(file, true);
// check the block is added to invalidateBlocks
final FSNamesystem fsn = cluster.getNamesystem();
final BlockManager bm = fsn.getBlockManager();
- DatanodeDescriptor dnd = NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
+ DatanodeDescriptor dnd =
+ NameNodeAdapter.getDatanode(fsn, dn.getDatanodeId());
Assert.assertTrue(bm.containsInvalidateBlock(
blks[0].getLocations()[0], b) || dnd.containsInvalidateBlock(b));
} finally {
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
}
}
-
- /**
- * Test reading a file with some blocks(data blocks or parity blocks or both)
- * deleted or corrupted.
- * @param src file path
- * @param fileLength file length
- * @param dataBlkDelNum the deleted or corrupted number of data blocks.
- * @param parityBlkDelNum the deleted or corrupted number of parity blocks.
- * @param deleteBlockFile whether block file is deleted or corrupted.
- * true is to delete the block file.
- * false is to corrupt the content of the block file.
- * @throws IOException
- */
- private void testReadWithBlockCorrupted(String src, int fileLength,
- int dataBlkDelNum, int parityBlkDelNum, boolean deleteBlockFile)
- throws IOException {
- LOG.info("testReadWithBlockCorrupted: file = " + src
- + ", dataBlkDelNum = " + dataBlkDelNum
- + ", parityBlkDelNum = " + parityBlkDelNum
- + ", deleteBlockFile? " + deleteBlockFile);
- int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
- Assert.assertTrue("dataBlkDelNum and parityBlkDelNum should be positive",
- dataBlkDelNum >= 0 && parityBlkDelNum >= 0);
- Assert.assertTrue("The sum of dataBlkDelNum and parityBlkDelNum " +
- "should be between 1 ~ " + parityBlocks, recoverBlkNum <= parityBlocks);
-
- // write a file with the length of writeLen
- Path srcPath = new Path(src);
- final byte[] bytes = StripedFileTestUtil.generateBytes(fileLength);
- DFSTestUtil.writeFile(fs, srcPath, bytes);
-
- // delete or corrupt some blocks
- corruptBlocks(srcPath, dataBlkDelNum, parityBlkDelNum, deleteBlockFile);
-
- // check the file can be read after some blocks were deleted
- verifyRead(srcPath, fileLength, bytes);
- }
-
- private void corruptBlocks(Path srcPath, int dataBlkDelNum,
- int parityBlkDelNum, boolean deleteBlockFile) throws IOException {
- int recoverBlkNum = dataBlkDelNum + parityBlkDelNum;
-
- LocatedBlocks locatedBlocks = getLocatedBlocks(srcPath);
- LocatedStripedBlock lastBlock =
- (LocatedStripedBlock)locatedBlocks.getLastLocatedBlock();
-
- int[] delDataBlkIndices = StripedFileTestUtil.randomArray(0, dataBlocks,
- dataBlkDelNum);
- Assert.assertNotNull(delDataBlkIndices);
- int[] delParityBlkIndices = StripedFileTestUtil.randomArray(dataBlocks,
- dataBlocks + parityBlocks, parityBlkDelNum);
- Assert.assertNotNull(delParityBlkIndices);
-
- int[] delBlkIndices = new int[recoverBlkNum];
- System.arraycopy(delDataBlkIndices, 0,
- delBlkIndices, 0, delDataBlkIndices.length);
- System.arraycopy(delParityBlkIndices, 0,
- delBlkIndices, delDataBlkIndices.length, delParityBlkIndices.length);
-
- ExtendedBlock[] delBlocks = new ExtendedBlock[recoverBlkNum];
- for (int i = 0; i < recoverBlkNum; i++) {
- delBlocks[i] = StripedBlockUtil
- .constructInternalBlock(lastBlock.getBlock(),
- cellSize, dataBlocks, delBlkIndices[i]);
- if (deleteBlockFile) {
- // delete the block file
- cluster.corruptBlockOnDataNodesByDeletingBlockFile(delBlocks[i]);
- } else {
- // corrupt the block file
- cluster.corruptBlockOnDataNodes(delBlocks[i]);
- }
- }
- }
-
- private LocatedBlocks getLocatedBlocks(Path filePath) throws IOException {
- return fs.getClient().getLocatedBlocks(filePath.toString(),
- 0, Long.MAX_VALUE);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4035d42/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java
new file mode 100644
index 0000000..5a8fb4f
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingCorruptData.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
+
+/**
+ * Test online recovery with corrupt files. This test is parameterized.
+ */
+@RunWith(Parameterized.class)
+public class TestReadStripedFileWithDecodingCorruptData {
+ static final Logger LOG =
+ LoggerFactory.getLogger(TestReadStripedFileWithDecodingCorruptData.class);
+
+ private static MiniDFSCluster cluster;
+ private static DistributedFileSystem dfs;
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(300000);
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ cluster = initializeCluster();
+ dfs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ tearDownCluster(cluster);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getParameters() {
+ return ReadStripedFileWithDecodingHelper.getParameters();
+ }
+
+ private int fileLength;
+ private int dataDelNum;
+ private int parityDelNum;
+
+ public TestReadStripedFileWithDecodingCorruptData(int fileLength, int
+ dataDelNum, int parityDelNum) {
+ this.fileLength = fileLength;
+ this.dataDelNum = dataDelNum;
+ this.parityDelNum = parityDelNum;
+ }
+
+ /**
+ * Corrupt tolerable number of block before reading.
+ * Verify the decoding works correctly.
+ */
+ @Test
+ public void testReadCorruptedData() throws IOException {
+ String src = "/corrupted_" + dataDelNum + "_" + parityDelNum;
+ ReadStripedFileWithDecodingHelper.testReadWithBlockCorrupted(cluster,
+ dfs, src, fileLength, dataDelNum, parityDelNum, false);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d4035d42/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java
new file mode 100644
index 0000000..c267e84
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReadStripedFileWithDecodingDeletedData.java
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.Timeout;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.initializeCluster;
+import static org.apache.hadoop.hdfs.ReadStripedFileWithDecodingHelper.tearDownCluster;
+
+/**
+ * Test online recovery with files with deleted blocks. This test is
+ * parameterized.
+ */
+@RunWith(Parameterized.class)
+public class TestReadStripedFileWithDecodingDeletedData {
+ static final Logger LOG =
+ LoggerFactory.getLogger(TestReadStripedFileWithDecodingDeletedData.class);
+
+ private static MiniDFSCluster cluster;
+ private static DistributedFileSystem dfs;
+
+ @Rule
+ public Timeout globalTimeout = new Timeout(300000);
+
+ @BeforeClass
+ public static void setup() throws IOException {
+ cluster = initializeCluster();
+ dfs = cluster.getFileSystem();
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ tearDownCluster(cluster);
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> getParameters() {
+ return ReadStripedFileWithDecodingHelper.getParameters();
+ }
+
+ private int fileLength;
+ private int dataDelNum;
+ private int parityDelNum;
+
+ public TestReadStripedFileWithDecodingDeletedData(int fileLength, int
+ dataDelNum, int parityDelNum) {
+ this.fileLength = fileLength;
+ this.dataDelNum = dataDelNum;
+ this.parityDelNum = parityDelNum;
+ }
+
+ /**
+ * Delete tolerable number of block before reading.
+ * Verify the decoding works correctly.
+ */
+ @Test
+ public void testReadCorruptedDataByDeleting() throws IOException {
+ String src = "/deleted_" + dataDelNum + "_" + parityDelNum;
+ ReadStripedFileWithDecodingHelper.testReadWithBlockCorrupted(cluster,
+ dfs, src, fileLength, dataDelNum, parityDelNum, true);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org