You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/04/07 02:00:35 UTC
svn commit: r1089686 - in /hadoop/mapreduce/trunk: ./
src/contrib/raid/src/test/org/apache/hadoop/raid/
Author: todd
Date: Thu Apr 7 00:00:34 2011
New Revision: 1089686
URL: http://svn.apache.org/viewvc?rev=1089686&view=rev
Log:
MAPREDUCE-2395. TestBlockFixer timing out on trunk. Contributed by Ramkumar Vadali.
Added:
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1089686&r1=1089685&r2=1089686&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Apr 7 00:00:34 2011
@@ -121,6 +121,9 @@ Trunk (unreleased changes)
MAPREDUCE-2348. Disable mumak tests on trunk since they currently time out
(todd)
+ MAPREDUCE-2395. TestBlockFixer timing out on trunk. (Ramkumar Vadali via
+ todd)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java?rev=1089686&r1=1089685&r2=1089686&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java Thu Apr 7 00:00:34 2011
@@ -135,11 +135,6 @@ public class TestBlockFixer {
}
@Test
- public void testBlockFixDist() throws Exception {
- implBlockFix(false);
- }
-
- @Test
public void testBlockFixLocal() throws Exception {
implBlockFix(true);
}
@@ -148,7 +143,7 @@ public class TestBlockFixer {
* Create a file with three stripes, corrupt a block each in two stripes,
* and wait for the the file to be fixed.
*/
- private void implBlockFix(boolean local) throws Exception {
+ protected void implBlockFix(boolean local) throws Exception {
LOG.info("Test testBlockFix started.");
long blockSize = 8192L;
int stripeLength = 3;
@@ -231,7 +226,7 @@ public class TestBlockFixer {
* regenerated. Now stop RaidNode and corrupt the generated block.
* Test that corruption in the generated block can be detected by clients.
*/
- private void generatedBlockTestCommon(String testName, int blockToCorrupt,
+ protected void generatedBlockTestCommon(String testName, int blockToCorrupt,
boolean local) throws Exception {
LOG.info("Test " + testName + " started.");
long blockSize = 8192L;
@@ -330,17 +325,6 @@ public class TestBlockFixer {
* Test that corruption in the generated block can be detected by clients.
*/
@Test
- public void testGeneratedBlockDist() throws Exception {
- generatedBlockTestCommon("testGeneratedBlock", 3, false);
- }
-
- /**
- * Tests integrity of generated block.
- * Create a file and delete a block entirely. Wait for the block to be
- * regenerated. Now stop RaidNode and corrupt the generated block.
- * Test that corruption in the generated block can be detected by clients.
- */
- @Test
public void testGeneratedBlockLocal() throws Exception {
generatedBlockTestCommon("testGeneratedBlock", 3, true);
}
@@ -352,27 +336,11 @@ public class TestBlockFixer {
* Test that corruption in the generated block can be detected by clients.
*/
@Test
- public void testGeneratedLastBlockDist() throws Exception {
- generatedBlockTestCommon("testGeneratedLastBlock", 6, false);
- }
-
- /**
- * Tests integrity of generated last block.
- * Create a file and delete a block entirely. Wait for the block to be
- * regenerated. Now stop RaidNode and corrupt the generated block.
- * Test that corruption in the generated block can be detected by clients.
- */
- @Test
public void testGeneratedLastBlockLocal() throws Exception {
generatedBlockTestCommon("testGeneratedLastBlock", 6, true);
}
@Test
- public void testParityBlockFixDist() throws Exception {
- implParityBlockFix("testParityBlockFixDist", false);
- }
-
- @Test
public void testParityBlockFixLocal() throws Exception {
implParityBlockFix("testParityBlockFixLocal", true);
}
@@ -380,7 +348,7 @@ public class TestBlockFixer {
/**
* Corrupt a parity file and wait for it to get fixed.
*/
- private void implParityBlockFix(String testName, boolean local)
+ protected void implParityBlockFix(String testName, boolean local)
throws Exception {
LOG.info("Test " + testName + " started.");
long blockSize = 8192L;
@@ -462,16 +430,11 @@ public class TestBlockFixer {
}
@Test
- public void testParityHarBlockFixDist() throws Exception {
- implParityHarBlockFix("testParityHarBlockFixDist", false);
- }
-
- @Test
public void testParityHarBlockFixLocal() throws Exception {
implParityHarBlockFix("testParityHarBlockFixLocal", true);
}
- private void implParityHarBlockFix(String testName, boolean local)
+ protected void implParityHarBlockFix(String testName, boolean local)
throws Exception {
LOG.info("Test " + testName + " started.");
long blockSize = 8192L;
@@ -567,218 +530,7 @@ public class TestBlockFixer {
}
- /**
- * tests that we can have 2 concurrent jobs fixing files
- * (dist block fixer)
- */
- @Test
- public void testConcurrentJobs() throws Exception {
- LOG.info("Test testConcurrentJobs started.");
- long blockSize = 8192L;
- int stripeLength = 3;
- mySetup(stripeLength, -1); // never har
- Path file1 = new Path("/user/dhruba/raidtest/file1");
- Path file2 = new Path("/user/dhruba/raidtest/file2");
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
- long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
- 1, 20, blockSize);
- long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
- 1, 20, blockSize);
- long file1Len = fileSys.getFileStatus(file1).getLen();
- long file2Len = fileSys.getFileStatus(file2).getLen();
- LOG.info("Test testConcurrentJobs created test files");
-
- // create an instance of the RaidNode
- Configuration localConf = new Configuration(conf);
- localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
- localConf.setInt("raid.blockfix.interval", 1000);
- localConf.set("raid.blockfix.classname",
- "org.apache.hadoop.raid.DistBlockFixer");
- localConf.setLong("raid.blockfix.filespertask", 2L);
-
- try {
- cnode = RaidNode.createRaidNode(null, localConf);
- TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
- TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
- cnode.stop(); cnode.join();
-
- FileStatus file1Stat = fileSys.getFileStatus(file1);
- FileStatus file2Stat = fileSys.getFileStatus(file2);
- DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
- LocatedBlocks file1Loc =
- RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
- 0, file1Stat.getLen());
- LocatedBlocks file2Loc =
- RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
- 0, file2Stat.getLen());
-
- String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
- assertEquals("no corrupt files expected", 0, corruptFiles.length);
- assertEquals("filesFixed() should return 0 before fixing files",
- 0, cnode.blockFixer.filesFixed());
-
- // corrupt file1
- int[] corruptBlockIdxs = new int[]{0, 4, 6};
- for (int idx: corruptBlockIdxs)
- corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
- reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
-
- cnode = RaidNode.createRaidNode(null, localConf);
- DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
- long start = System.currentTimeMillis();
-
- while (blockFixer.jobsRunning() < 1 &&
- System.currentTimeMillis() - start < 240000) {
- LOG.info("Test testBlockFix waiting for fixing job 1 to start");
- Thread.sleep(10);
- }
- assertEquals("job 1 not running", 1, blockFixer.jobsRunning());
-
- // corrupt file2
- for (int idx: corruptBlockIdxs)
- corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
- reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
-
- while (blockFixer.jobsRunning() < 2 &&
- System.currentTimeMillis() - start < 240000) {
- LOG.info("Test testBlockFix waiting for fixing job 2 to start");
- Thread.sleep(10);
- }
- assertEquals("2 jobs not running", 2, blockFixer.jobsRunning());
-
- while (blockFixer.filesFixed() < 2 &&
- System.currentTimeMillis() - start < 240000) {
- LOG.info("Test testBlockFix waiting for files to be fixed.");
- Thread.sleep(10);
- }
- assertEquals("files not fixed", 2, blockFixer.filesFixed());
-
- dfs = getDFS(conf, dfs);
-
- try {
- Thread.sleep(5*1000);
- } catch (InterruptedException ignore) {
- }
- assertTrue("file not fixed",
- TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
- assertTrue("file not fixed",
- TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
- } catch (Exception e) {
- LOG.info("Test testConcurrentJobs exception " + e +
- StringUtils.stringifyException(e));
- throw e;
- } finally {
- myTearDown();
- }
-
- }
-
- /**
- * tests that the distributed block fixer obeys
- * the limit on how many files to fix simultaneously
- */
- @Test
- public void testMaxPendingFiles() throws Exception {
- LOG.info("Test testMaxPendingFiles started.");
- long blockSize = 8192L;
- int stripeLength = 3;
- mySetup(stripeLength, -1); // never har
- Path file1 = new Path("/user/dhruba/raidtest/file1");
- Path file2 = new Path("/user/dhruba/raidtest/file2");
- Path destPath = new Path("/destraid/user/dhruba/raidtest");
- long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
- 1, 20, blockSize);
- long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
- 1, 20, blockSize);
- long file1Len = fileSys.getFileStatus(file1).getLen();
- long file2Len = fileSys.getFileStatus(file2).getLen();
- LOG.info("Test testMaxPendingFiles created test files");
-
- // create an instance of the RaidNode
- Configuration localConf = new Configuration(conf);
- localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
- localConf.setInt("raid.blockfix.interval", 1000);
- localConf.set("raid.blockfix.classname",
- "org.apache.hadoop.raid.DistBlockFixer");
- localConf.setLong("raid.blockfix.filespertask", 2L);
- localConf.setLong("raid.blockfix.maxpendingfiles", 1L);
-
- try {
- cnode = RaidNode.createRaidNode(null, localConf);
- TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
- TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
- cnode.stop(); cnode.join();
-
- FileStatus file1Stat = fileSys.getFileStatus(file1);
- FileStatus file2Stat = fileSys.getFileStatus(file2);
- DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
- LocatedBlocks file1Loc =
- RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
- 0, file1Stat.getLen());
- LocatedBlocks file2Loc =
- RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
- 0, file2Stat.getLen());
-
- String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
- assertEquals("no corrupt files expected", 0, corruptFiles.length);
- assertEquals("filesFixed() should return 0 before fixing files",
- 0, cnode.blockFixer.filesFixed());
-
- // corrupt file1
- int[] corruptBlockIdxs = new int[]{0, 4, 6};
- for (int idx: corruptBlockIdxs)
- corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
- reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
- corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
-
- cnode = RaidNode.createRaidNode(null, localConf);
- DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
- long start = System.currentTimeMillis();
-
- while (blockFixer.jobsRunning() < 1 &&
- System.currentTimeMillis() - start < 240000) {
- LOG.info("Test testBlockFix waiting for fixing job 1 to start");
- Thread.sleep(10);
- }
- assertEquals("job not running", 1, blockFixer.jobsRunning());
-
- // corrupt file2
- for (int idx: corruptBlockIdxs)
- corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
- reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
- corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
-
- // wait until both files are fixed
- while (blockFixer.filesFixed() < 2 &&
- System.currentTimeMillis() - start < 240000) {
- // make sure the block fixer does not start a second job while
- // the first one is still running
- assertTrue("too many jobs running", blockFixer.jobsRunning() <= 1);
- Thread.sleep(10);
- }
- assertEquals("files not fixed", 2, blockFixer.filesFixed());
-
- dfs = getDFS(conf, dfs);
-
- try {
- Thread.sleep(5*1000);
- } catch (InterruptedException ignore) {
- }
- assertTrue("file not fixed",
- TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
- assertTrue("file not fixed",
- TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
- } catch (Exception e) {
- LOG.info("Test testMaxPendingFiles exception " + e +
- StringUtils.stringifyException(e));
- throw e;
- } finally {
- myTearDown();
- }
-
- }
-
- private static DistributedFileSystem getDFS(
+ protected static DistributedFileSystem getDFS(
Configuration conf, FileSystem dfs) throws IOException {
Configuration clientConf = new Configuration(conf);
clientConf.set("fs.hdfs.impl",
@@ -789,7 +541,7 @@ public class TestBlockFixer {
return (DistributedFileSystem) FileSystem.get(dfsUri, clientConf);
}
- private void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
+ protected void mySetup(int stripeLength, int timeBeforeHar) throws Exception {
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
conf = new Configuration();
@@ -869,7 +621,7 @@ public class TestBlockFixer {
fileWriter.close();
}
- private void myTearDown() throws Exception {
+ protected void myTearDown() throws Exception {
if (cnode != null) { cnode.stop(); cnode.join(); }
if (mr != null) { mr.shutdown(); }
if (dfs != null) { dfs.shutdown(); }
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java?rev=1089686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerBlockFixDist.java Thu Apr 7 00:00:34 2011
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBlockFixerBlockFixDist extends TestBlockFixer {
+ @Test
+ public void testBlockFixDist() throws Exception {
+ implBlockFix(false);
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java?rev=1089686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerDistConcurrency.java Thu Apr 7 00:00:34 2011
@@ -0,0 +1,277 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.hdfs.RaidDFSUtil;
+import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
+import org.apache.hadoop.hdfs.TestRaidDfs;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.RaidUtils;
+
+public class TestBlockFixerDistConcurrency extends TestBlockFixer {
+ /**
+ * tests that we can have 2 concurrent jobs fixing files
+ * (dist block fixer)
+ */
+ @Test
+ public void testConcurrentJobs() throws Exception {
+ LOG.info("Test testConcurrentJobs started.");
+ long blockSize = 8192L;
+ int stripeLength = 3;
+ mySetup(stripeLength, -1); // never har
+ Path file1 = new Path("/user/dhruba/raidtest/file1");
+ Path file2 = new Path("/user/dhruba/raidtest/file2");
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+ 1, 20, blockSize);
+ long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
+ 1, 20, blockSize);
+ long file1Len = fileSys.getFileStatus(file1).getLen();
+ long file2Len = fileSys.getFileStatus(file2).getLen();
+ LOG.info("Test testConcurrentJobs created test files");
+
+ // create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ localConf.setInt("raid.blockfix.interval", 1000);
+ localConf.set("raid.blockfix.classname",
+ "org.apache.hadoop.raid.DistBlockFixer");
+ localConf.setLong("raid.blockfix.filespertask", 2L);
+
+ try {
+ cnode = RaidNode.createRaidNode(null, localConf);
+ TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+ TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
+ cnode.stop(); cnode.join();
+
+ FileStatus file1Stat = fileSys.getFileStatus(file1);
+ FileStatus file2Stat = fileSys.getFileStatus(file2);
+ DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+ LocatedBlocks file1Loc =
+ RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
+ 0, file1Stat.getLen());
+ LocatedBlocks file2Loc =
+ RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
+ 0, file2Stat.getLen());
+
+ String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+ assertEquals("no corrupt files expected", 0, corruptFiles.length);
+ assertEquals("filesFixed() should return 0 before fixing files",
+ 0, cnode.blockFixer.filesFixed());
+
+ // corrupt file1
+ int[] corruptBlockIdxs = new int[]{0, 4, 6};
+ for (int idx: corruptBlockIdxs)
+ corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+ reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
+
+ cnode = RaidNode.createRaidNode(null, localConf);
+ DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
+ long start = System.currentTimeMillis();
+
+ while (blockFixer.jobsRunning() < 1 &&
+ System.currentTimeMillis() - start < 240000) {
+ LOG.info("Test testBlockFix waiting for fixing job 1 to start");
+ Thread.sleep(10);
+ }
+ assertEquals("job 1 not running", 1, blockFixer.jobsRunning());
+
+ // corrupt file2
+ for (int idx: corruptBlockIdxs)
+ corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+ reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
+
+ while (blockFixer.jobsRunning() < 2 &&
+ System.currentTimeMillis() - start < 240000) {
+ LOG.info("Test testBlockFix waiting for fixing job 2 to start");
+ Thread.sleep(10);
+ }
+ assertEquals("2 jobs not running", 2, blockFixer.jobsRunning());
+
+ while (blockFixer.filesFixed() < 2 &&
+ System.currentTimeMillis() - start < 240000) {
+ LOG.info("Test testBlockFix waiting for files to be fixed.");
+ Thread.sleep(10);
+ }
+ assertEquals("files not fixed", 2, blockFixer.filesFixed());
+
+ dfs = getDFS(conf, dfs);
+
+ try {
+ Thread.sleep(5*1000);
+ } catch (InterruptedException ignore) {
+ }
+ assertTrue("file not fixed",
+ TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+ assertTrue("file not fixed",
+ TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
+ } catch (Exception e) {
+ LOG.info("Test testConcurrentJobs exception " + e +
+ StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ myTearDown();
+ }
+
+ }
+
+ /**
+ * tests that the distributed block fixer obeys
+ * the limit on how many files to fix simultaneously
+ */
+ @Test
+ public void testMaxPendingFiles() throws Exception {
+ LOG.info("Test testMaxPendingFiles started.");
+ long blockSize = 8192L;
+ int stripeLength = 3;
+ mySetup(stripeLength, -1); // never har
+ Path file1 = new Path("/user/dhruba/raidtest/file1");
+ Path file2 = new Path("/user/dhruba/raidtest/file2");
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ long crc1 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file1,
+ 1, 20, blockSize);
+ long crc2 = TestRaidDfs.createTestFilePartialLastBlock(fileSys, file2,
+ 1, 20, blockSize);
+ long file1Len = fileSys.getFileStatus(file1).getLen();
+ long file2Len = fileSys.getFileStatus(file2).getLen();
+ LOG.info("Test testMaxPendingFiles created test files");
+
+ // create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ localConf.setInt("raid.blockfix.interval", 1000);
+ localConf.set("raid.blockfix.classname",
+ "org.apache.hadoop.raid.DistBlockFixer");
+ localConf.setLong("raid.blockfix.filespertask", 2L);
+ localConf.setLong("raid.blockfix.maxpendingfiles", 1L);
+
+ try {
+ cnode = RaidNode.createRaidNode(null, localConf);
+ TestRaidDfs.waitForFileRaided(LOG, fileSys, file1, destPath);
+ TestRaidDfs.waitForFileRaided(LOG, fileSys, file2, destPath);
+ cnode.stop(); cnode.join();
+
+ FileStatus file1Stat = fileSys.getFileStatus(file1);
+ FileStatus file2Stat = fileSys.getFileStatus(file2);
+ DistributedFileSystem dfs = (DistributedFileSystem)fileSys;
+ LocatedBlocks file1Loc =
+ RaidDFSUtil.getBlockLocations(dfs, file1.toUri().getPath(),
+ 0, file1Stat.getLen());
+ LocatedBlocks file2Loc =
+ RaidDFSUtil.getBlockLocations(dfs, file2.toUri().getPath(),
+ 0, file2Stat.getLen());
+
+ String[] corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+ assertEquals("no corrupt files expected", 0, corruptFiles.length);
+ assertEquals("filesFixed() should return 0 before fixing files",
+ 0, cnode.blockFixer.filesFixed());
+
+ // corrupt file1
+ int[] corruptBlockIdxs = new int[]{0, 4, 6};
+ for (int idx: corruptBlockIdxs)
+ corruptBlock(file1Loc.get(idx).getBlock().getBlockName());
+ reportCorruptBlocks(dfs, file1, corruptBlockIdxs, blockSize);
+ corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+
+ cnode = RaidNode.createRaidNode(null, localConf);
+ DistBlockFixer blockFixer = (DistBlockFixer) cnode.blockFixer;
+ long start = System.currentTimeMillis();
+
+ while (blockFixer.jobsRunning() < 1 &&
+ System.currentTimeMillis() - start < 240000) {
+ LOG.info("Test testBlockFix waiting for fixing job 1 to start");
+ Thread.sleep(10);
+ }
+ assertEquals("job not running", 1, blockFixer.jobsRunning());
+
+ // corrupt file2
+ for (int idx: corruptBlockIdxs)
+ corruptBlock(file2Loc.get(idx).getBlock().getBlockName());
+ reportCorruptBlocks(dfs, file2, corruptBlockIdxs, blockSize);
+ corruptFiles = RaidDFSUtil.getCorruptFiles(dfs);
+
+ // wait until both files are fixed
+ while (blockFixer.filesFixed() < 2 &&
+ System.currentTimeMillis() - start < 240000) {
+ // make sure the block fixer does not start a second job while
+ // the first one is still running
+ assertTrue("too many jobs running", blockFixer.jobsRunning() <= 1);
+ Thread.sleep(10);
+ }
+ assertEquals("files not fixed", 2, blockFixer.filesFixed());
+
+ dfs = getDFS(conf, dfs);
+
+ try {
+ Thread.sleep(5*1000);
+ } catch (InterruptedException ignore) {
+ }
+ assertTrue("file not fixed",
+ TestRaidDfs.validateFile(dfs, file1, file1Len, crc1));
+ assertTrue("file not fixed",
+ TestRaidDfs.validateFile(dfs, file2, file2Len, crc2));
+ } catch (Exception e) {
+ LOG.info("Test testMaxPendingFiles exception " + e +
+ StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ myTearDown();
+ }
+
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java?rev=1089686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerGeneratedBlockDist.java Thu Apr 7 00:00:34 2011
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBlockFixerGeneratedBlockDist extends TestBlockFixer {
+ /**
+ * Tests integrity of generated block.
+ * Create a file and delete a block entirely. Wait for the block to be
+ * regenerated. Now stop RaidNode and corrupt the generated block.
+ * Test that corruption in the generated block can be detected by clients.
+ */
+ @Test
+ public void testGeneratedBlockDist() throws Exception {
+ generatedBlockTestCommon("testGeneratedBlock", 3, false);
+ }
+
+ /**
+ * Tests integrity of generated last block.
+ * Create a file and delete a block entirely. Wait for the block to be
+ * regenerated. Now stop RaidNode and corrupt the generated block.
+ * Test that corruption in the generated block can be detected by clients.
+ */
+ @Test
+ public void testGeneratedLastBlockDist() throws Exception {
+ generatedBlockTestCommon("testGeneratedLastBlock", 6, false);
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java?rev=1089686&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixerParityBlockFixDist.java Thu Apr 7 00:00:34 2011
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+public class TestBlockFixerParityBlockFixDist extends TestBlockFixer {
+ @Test
+ public void testParityBlockFixDist() throws Exception {
+ implParityBlockFix("testParityBlockFixDist", false);
+ }
+
+ @Test
+ public void testParityHarBlockFixDist() throws Exception {
+ implParityHarBlockFix("testParityHarBlockFixDist", false);
+ }
+}