You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sc...@apache.org on 2011/01/13 01:31:41 UTC
svn commit: r1058374 - in /hadoop/mapreduce/trunk: ./
src/contrib/raid/src/java/org/apache/hadoop/hdfs/
src/contrib/raid/src/java/org/apache/hadoop/raid/
src/contrib/raid/src/test/org/apache/hadoop/hdfs/
Author: schen
Date: Thu Jan 13 00:31:41 2011
New Revision: 1058374
URL: http://svn.apache.org/viewvc?rev=1058374&view=rev
Log:
MAPREDUCE-2248. DistributedRaidFileSystem should unraid only the corrupt block
(Ramkumar Vadali via schen)
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Thu Jan 13 00:31:41 2011
@@ -473,6 +473,9 @@ Release 0.22.0 - Unreleased
classes themselves are already deprecated. This removes an Eclipse error.
(tomwhite via nigel)
+ MAPREDUCE-2248. DistributedRaidFileSystem should unraid only the corrupt
+ block (Ramkumar Vadali via schen)
+
Release 0.21.1 - Unreleased
NEW FEATURES
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java Thu Jan 13 00:31:41 2011
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
@@ -155,18 +156,40 @@ public class DistributedRaidFileSystem e
* from alternate locations if it encoumters read errors in the primary location.
*/
private static class ExtFSDataInputStream extends FSDataInputStream {
+
+ private static class UnderlyingBlock {
+ // File that holds this block. Need not be the same as outer file.
+ public Path path;
+ // Offset within path where this block starts.
+ public long actualFileOffset;
+ // Offset within the outer file where this block starts.
+ public long originalFileOffset;
+ // Length of the block (length <= blk sz of outer file).
+ public long length;
+ public UnderlyingBlock(Path path, long actualFileOffset,
+ long originalFileOffset, long length) {
+ this.path = path;
+ this.actualFileOffset = actualFileOffset;
+ this.originalFileOffset = originalFileOffset;
+ this.length = length;
+ }
+ }
+
/**
* Create an input stream that wraps all the reads/positions/seeking.
*/
private static class ExtFsInputStream extends FSInputStream {
- //The underlying data input stream that the
- // underlying filesystem will return.
- private FSDataInputStream underLyingStream;
+ // Extents of "good" underlying data that can be read.
+ private UnderlyingBlock[] underlyingBlocks;
+ private long currentOffset;
+ private FSDataInputStream currentStream;
+ private UnderlyingBlock currentBlock;
private byte[] oneBytebuff = new byte[1];
private int nextLocation;
private DistributedRaidFileSystem lfs;
private Path path;
+ private FileStatus stat;
private final DecodeInfo[] alternates;
private final int buffersize;
private final Configuration conf;
@@ -175,86 +198,133 @@ public class DistributedRaidFileSystem e
ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs,
DecodeInfo[] alternates, Path path, int stripeLength, int buffersize)
throws IOException {
- this.underLyingStream = lfs.fs.open(path, buffersize);
this.path = path;
this.nextLocation = 0;
+ // Construct array of blocks in file.
+ this.stat = lfs.getFileStatus(path);
+ long numBlocks = (this.stat.getLen() % this.stat.getBlockSize() == 0) ?
+ this.stat.getLen() / this.stat.getBlockSize() :
+ 1 + this.stat.getLen() / this.stat.getBlockSize();
+ this.underlyingBlocks = new UnderlyingBlock[(int)numBlocks];
+ for (int i = 0; i < numBlocks; i++) {
+ long actualFileOffset = i * stat.getBlockSize();
+ long originalFileOffset = i * stat.getBlockSize();
+ long length = Math.min(
+ stat.getBlockSize(), stat.getLen() - originalFileOffset);
+ this.underlyingBlocks[i] = new UnderlyingBlock(
+ path, actualFileOffset, originalFileOffset, length);
+ }
+ this.currentOffset = 0;
+ this.currentBlock = null;
this.alternates = alternates;
this.buffersize = buffersize;
this.conf = conf;
this.lfs = lfs;
this.stripeLength = stripeLength;
+ // Open a stream to the first block.
+ openCurrentStream();
}
-
+
+ private void closeCurrentStream() throws IOException {
+ if (currentStream != null) {
+ currentStream.close();
+ currentStream = null;
+ }
+ }
+
+ /**
+ * Open a stream to the file containing the current block
+ * and seek to the appropriate offset
+ */
+ private void openCurrentStream() throws IOException {
+ int blockIdx = (int)(currentOffset/stat.getBlockSize());
+ UnderlyingBlock block = underlyingBlocks[blockIdx];
+ // If the current path is the same as we want.
+ if (currentBlock == block ||
+ currentBlock != null && currentBlock.path == block.path) {
+ // If we have a valid stream, nothing to do.
+ if (currentStream != null) {
+ currentBlock = block;
+ return;
+ }
+ } else {
+ closeCurrentStream();
+ }
+ currentBlock = block;
+ currentStream = lfs.fs.open(currentBlock.path, buffersize);
+ long offset = block.actualFileOffset +
+ (currentOffset - block.originalFileOffset);
+ currentStream.seek(offset);
+ }
+
+ /**
+ * Returns the number of bytes available in the current block.
+ */
+ private int blockAvailable() {
+ return (int) (currentBlock.length -
+ (currentOffset - currentBlock.originalFileOffset));
+ }
+
@Override
public synchronized int available() throws IOException {
- int value = underLyingStream.available();
+ // Application should not assume that any bytes are buffered here.
nextLocation = 0;
- return value;
+ return Math.min(blockAvailable(), currentStream.available());
}
-
+
@Override
public synchronized void close() throws IOException {
- underLyingStream.close();
+ closeCurrentStream();
super.close();
}
-
+
+ @Override
+ public boolean markSupported() { return false; }
+
@Override
public void mark(int readLimit) {
- underLyingStream.mark(readLimit);
+ // Mark and reset are not supported.
nextLocation = 0;
}
-
+
@Override
public void reset() throws IOException {
- underLyingStream.reset();
+ // Mark and reset are not supported.
nextLocation = 0;
}
-
+
@Override
public synchronized int read() throws IOException {
- long pos = underLyingStream.getPos();
- while (true) {
- try {
- int value = underLyingStream.read();
- nextLocation = 0;
- return value;
- } catch (BlockMissingException e) {
- setAlternateLocations(e, pos);
- } catch (ChecksumException e) {
- setAlternateLocations(e, pos);
- }
+ int value = read(oneBytebuff);
+ if (value < 0) {
+ return value;
+ } else {
+ return oneBytebuff[0];
}
}
-
+
@Override
public synchronized int read(byte[] b) throws IOException {
- long pos = underLyingStream.getPos();
- while (true) {
- try{
- int value = underLyingStream.read(b);
- nextLocation = 0;
- return value;
- } catch (BlockMissingException e) {
- setAlternateLocations(e, pos);
- } catch (ChecksumException e) {
- setAlternateLocations(e, pos);
- }
- }
+ int value = read(b, 0, b.length);
+ nextLocation = 0;
+ return value;
}
@Override
public synchronized int read(byte[] b, int offset, int len)
throws IOException {
- long pos = underLyingStream.getPos();
while (true) {
+ openCurrentStream();
try{
- int value = underLyingStream.read(b, offset, len);
+ int limit = Math.min(blockAvailable(), len);
+ int value = currentStream.read(b, offset, limit);
+ currentOffset += value;
nextLocation = 0;
return value;
} catch (BlockMissingException e) {
- setAlternateLocations(e, pos);
+ setAlternateLocations(e, currentOffset);
} catch (ChecksumException e) {
- setAlternateLocations(e, pos);
+ setAlternateLocations(e, currentOffset);
}
}
}
@@ -262,43 +332,49 @@ public class DistributedRaidFileSystem e
@Override
public synchronized int read(long position, byte[] b, int offset, int len)
throws IOException {
- long pos = underLyingStream.getPos();
- while (true) {
- try {
- int value = underLyingStream.read(position, b, offset, len);
- nextLocation = 0;
- return value;
- } catch (BlockMissingException e) {
- setAlternateLocations(e, pos);
- } catch (ChecksumException e) {
- setAlternateLocations(e, pos);
- }
+ long oldPos = currentOffset;
+ seek(position);
+ try {
+ return read(b, offset, len);
+ } finally {
+ seek(oldPos);
}
}
@Override
public synchronized long skip(long n) throws IOException {
- long value = underLyingStream.skip(n);
+ long skipped = 0;
+ while (skipped < n) {
+ int val = read();
+ if (val < 0) {
+ break;
+ }
+ skipped++;
+ }
nextLocation = 0;
- return value;
+ return skipped;
}
@Override
public synchronized long getPos() throws IOException {
- long value = underLyingStream.getPos();
nextLocation = 0;
- return value;
+ return currentOffset;
}
@Override
public synchronized void seek(long pos) throws IOException {
- underLyingStream.seek(pos);
+ if (pos != currentOffset) {
+ closeCurrentStream();
+ currentOffset = pos;
+ openCurrentStream();
+ }
nextLocation = 0;
}
@Override
public boolean seekToNewSource(long targetPos) throws IOException {
- boolean value = underLyingStream.seekToNewSource(targetPos);
+ seek(targetPos);
+ boolean value = currentStream.seekToNewSource(currentStream.getPos());
nextLocation = 0;
return value;
}
@@ -309,49 +385,53 @@ public class DistributedRaidFileSystem e
@Override
public void readFully(long pos, byte[] b, int offset, int length)
throws IOException {
- long post = underLyingStream.getPos();
- while (true) {
- try {
- underLyingStream.readFully(pos, b, offset, length);
- nextLocation = 0;
- return;
- } catch (BlockMissingException e) {
- setAlternateLocations(e, post);
- } catch (ChecksumException e) {
- setAlternateLocations(e, pos);
+ long oldPos = currentOffset;
+ seek(pos);
+ try {
+ while (true) {
+ // This loop retries reading until successful. Unrecoverable errors
+ // cause exceptions.
+ // currentOffset is changed by read().
+ try {
+ while (length > 0) {
+ int n = read(b, offset, length);
+ if (n < 0) {
+ throw new IOException("Premature EOF");
+ }
+ offset += n;
+ length -= n;
+ }
+ nextLocation = 0;
+ return;
+ } catch (BlockMissingException e) {
+ setAlternateLocations(e, currentOffset);
+ } catch (ChecksumException e) {
+ setAlternateLocations(e, currentOffset);
+ }
}
+ } finally {
+ seek(oldPos);
}
}
-
+
@Override
public void readFully(long pos, byte[] b) throws IOException {
- long post = underLyingStream.getPos();
- while (true) {
- try {
- underLyingStream.readFully(pos, b);
- nextLocation = 0;
- return;
- } catch (BlockMissingException e) {
- setAlternateLocations(e, post);
- } catch (ChecksumException e) {
- setAlternateLocations(e, pos);
- }
- }
+ readFully(pos, b, 0, b.length);
+ nextLocation = 0;
}
/**
- * Extract good file from RAID
- * @param curpos curexp the current exception
- * @param curpos the position of the current operation to be retried
+ * Extract good block from RAID
* @throws IOException if all alternate locations are exhausted
*/
- private void setAlternateLocations(IOException curexp, long curpos)
+ private void setAlternateLocations(IOException curexp, long offset)
throws IOException {
while (alternates != null && nextLocation < alternates.length) {
try {
int idx = nextLocation++;
- long corruptOffset = underLyingStream.getPos();
-
+ // Start offset of block.
+ long corruptOffset =
+ (offset / stat.getBlockSize()) * stat.getBlockSize();
// Make sure we use DFS and not DistributedRaidFileSystem for unRaid.
Configuration clientConf = new Configuration(conf);
Class<?> clazz = conf.getClass("fs.raid.underlyingfs.impl",
@@ -359,22 +439,21 @@ public class DistributedRaidFileSystem e
clientConf.set("fs.hdfs.impl", clazz.getName());
// Disable caching so that a previously cached RaidDfs is not used.
clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
- Path npath = RaidNode.unRaid(clientConf, path,
+ Path npath = RaidNode.unRaidCorruptBlock(clientConf, path,
alternates[idx].destPath,
alternates[idx].createDecoder(),
stripeLength, corruptOffset);
if (npath == null)
continue;
- FileSystem fs1 = getUnderlyingFileSystem(conf);
- fs1.initialize(npath.toUri(), conf);
- LOG.info("Opening alternate path " + npath + " at offset " + curpos);
- FSDataInputStream fd = fs1.open(npath, buffersize);
- fd.seek(curpos);
- underLyingStream.close();
- underLyingStream = fd;
- lfs.fs = fs1;
- path = npath;
+ closeCurrentStream();
+ LOG.info("Using block at offset " + corruptOffset + " from " +
+ npath);
+ currentBlock.path = npath;
+ currentBlock.actualFileOffset = 0; // Single block in file.
+ // Dont change currentOffset, in case the user had done a seek?
+ openCurrentStream();
+
return;
} catch (Exception e) {
LOG.info("Error in using alternate path " + path + ". " + e +
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Thu Jan 13 00:31:41 2011
@@ -738,7 +738,7 @@ public abstract class RaidNode implement
/**
* RAID an individual file
*/
- static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
+ static public void doRaid(Configuration conf, FileStatus stat, Path destPath,
PolicyInfo.ErasureCodeType code, Statistics statistics,
Progressable reporter, boolean doSimulate,
int targetRepl, int metaRepl, int stripeLength)
@@ -870,6 +870,35 @@ public abstract class RaidNode implement
return recoveredPath;
}
+ public static Path unRaidCorruptBlock(Configuration conf, Path srcPath,
+ Path destPathPrefix, Decoder decoder, int stripeLength,
+ long corruptOffset) throws IOException {
+ // Test if parity file exists
+ ParityFilePair ppair = getParityFile(destPathPrefix, srcPath, conf);
+ if (ppair == null) {
+ LOG.error("Could not find parity file for " + srcPath);
+ return null;
+ }
+
+ final Path recoveryDestination = new Path(RaidNode.xorTempPrefix(conf));
+ FileSystem destFs = recoveryDestination.getFileSystem(conf);
+ final Path recoveredPrefix =
+ destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
+ final Path recoveredBlock =
+ new Path(recoveredPrefix + "." + new Random().nextLong() + ".recovered");
+ LOG.info("Creating recovered Block " + recoveredBlock);
+
+ FileSystem srcFs = srcPath.getFileSystem(conf);
+ FileStatus stat = srcFs.getFileStatus(srcPath);
+ long limit = Math.min(stat.getBlockSize(), stat.getLen() - corruptOffset);
+ java.io.OutputStream out = ppair.getFileSystem().create(recoveredBlock);
+ decoder.fixErasedBlock(srcFs, srcPath,
+ ppair.getFileSystem(), ppair.getPath(),
+ stat.getBlockSize(), corruptOffset, 0, limit, out);
+ out.close();
+ return recoveredBlock;
+ }
+
/**
* Periodically delete orphaned parity files.
*/
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1058374&r1=1058373&r2=1058374&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Thu Jan 13 00:31:41 2011
@@ -54,13 +54,12 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
import org.apache.hadoop.raid.RaidNode;
-import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.RaidUtils;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
public class TestRaidDfs extends TestCase {
final static String TEST_DIR = new File(System.getProperty("test.build.data",
"build/contrib/raid/test/data")).getAbsolutePath();
- final static String CONFIG_FILE = new File(TEST_DIR,
- "test-raid.xml").getAbsolutePath();
final static long RELOAD_INTERVAL = 1000;
final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidDfs");
final static int NUM_DATANODES = 3;
@@ -70,18 +69,16 @@ public class TestRaidDfs extends TestCas
String hftp = null;
MiniDFSCluster dfs = null;
FileSystem fileSys = null;
- RaidNode cnode = null;
String jobTrackerName = null;
+ ErasureCodeType code;
+ int stripeLength;
- private void mySetup(String erasureCode, int stripeLength,
- int rsParityLength) throws Exception {
+ private void mySetup(
+ String erasureCode, int rsParityLength) throws Exception {
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
conf = new Configuration();
- conf.set("raid.config.file", CONFIG_FILE);
- conf.setBoolean("raid.config.reload", true);
- conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
conf.setInt(RaidNode.RS_PARITY_LENGTH_KEY, rsParityLength);
// scan all policies once every 5 second
@@ -105,41 +102,9 @@ public class TestRaidDfs extends TestCas
hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
FileSystem.setDefaultUri(conf, namenode);
-
- FileWriter fileWriter = new FileWriter(CONFIG_FILE);
- fileWriter.write("<?xml version=\"1.0\"?>\n");
- String str = "<configuration> " +
- "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
- "<policy name = \"RaidTest1\"> " +
- "<erasureCode>" + erasureCode + "</erasureCode> " +
- "<property> " +
- "<name>targetReplication</name> " +
- "<value>1</value> " +
- "<description>after RAIDing, decrease the replication factor of a file to this value." +
- "</description> " +
- "</property> " +
- "<property> " +
- "<name>metaReplication</name> " +
- "<value>1</value> " +
- "<description> replication factor of parity file" +
- "</description> " +
- "</property> " +
- "<property> " +
- "<name>modTimePeriod</name> " +
- "<value>2000</value> " +
- "<description> time (milliseconds) after a file is modified to make it " +
- "a candidate for RAIDing " +
- "</description> " +
- "</property> " +
- "</policy>" +
- "</srcPath>" +
- "</configuration>";
- fileWriter.write(str);
- fileWriter.close();
}
private void myTearDown() throws Exception {
- if (cnode != null) { cnode.stop(); cnode.join(); }
if (dfs != null) { dfs.shutdown(); }
}
@@ -218,7 +183,9 @@ public class TestRaidDfs extends TestCas
numBlocks, blockSize);
long length = fileSys.getFileStatus(srcFile).getLen();
- waitForFileRaided(LOG, fileSys, srcFile, destPath);
+ RaidNode.doRaid(conf, fileSys.getFileStatus(srcFile),
+ destPath, code, new RaidNode.Statistics(), new RaidUtils.DummyProgressable(),
+ false, repl, repl, stripeLength);
// Delete first block of file
for (int blockNumToCorrupt : listBlockNumToCorrupt) {
@@ -240,29 +207,25 @@ public class TestRaidDfs extends TestCas
public void testRaidDfsRs() throws Exception {
LOG.info("Test testRaidDfs started.");
+ code = ErasureCodeType.RS;
long blockSize = 8192L;
int numBlocks = 8;
- int stripeLength = 3;
- mySetup("rs", stripeLength, 3);
+ stripeLength = 3;
+ mySetup("rs", 3);
- // Create an instance of the RaidNode
- Configuration localConf = new Configuration(conf);
- localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
- cnode = RaidNode.createRaidNode(null, localConf);
Path destPath = new Path("/destraid/user/dhruba/raidtest");
int[][] corrupt = {{1, 2, 3}, {1, 4, 7}, {3, 6, 7}};
try {
for (int i = 0; i < corrupt.length; i++) {
Path file = new Path("/user/dhruba/raidtest/file" + i);
corruptBlockAndValidate(
- file, destPath, corrupt[0], blockSize, numBlocks);
+ file, new Path("/destraid"), corrupt[i], blockSize, numBlocks);
}
} catch (Exception e) {
LOG.info("testRaidDfs Exception " + e +
StringUtils.stringifyException(e));
throw e;
} finally {
- if (cnode != null) { cnode.stop(); cnode.join(); }
myTearDown();
}
LOG.info("Test testRaidDfs completed.");
@@ -272,31 +235,40 @@ public class TestRaidDfs extends TestCas
* Test DistributedRaidFileSystem.readFully()
*/
public void testReadFully() throws Exception {
- mySetup("xor", 3, 1);
+ code = ErasureCodeType.XOR;
+ stripeLength = 3;
+ mySetup("xor", 1);
try {
Path file = new Path("/user/raid/raidtest/file1");
- createTestFile(fileSys, file, 1, 7, 8192L);
-
- // filter all filesystem calls from client
- Configuration clientConf = new Configuration(conf);
- clientConf.set("fs.hdfs.impl",
- "org.apache.hadoop.hdfs.DistributedRaidFileSystem");
- clientConf.set("fs.raid.underlyingfs.impl",
- "org.apache.hadoop.hdfs.DistributedFileSystem");
- URI dfsUri = dfs.getFileSystem().getUri();
- FileSystem.closeAll();
- FileSystem raidfs = FileSystem.get(dfsUri, clientConf);
+ long crc = createTestFile(fileSys, file, 1, 8, 8192L);
+ FileStatus stat = fileSys.getFileStatus(file);
+ LOG.info("Created " + file + ", crc=" + crc + ", len=" + stat.getLen());
- FileStatus stat = raidfs.getFileStatus(file);
byte[] filebytes = new byte[(int)stat.getLen()];
+ // Test that readFully returns the correct CRC when there are no errors.
+ DistributedRaidFileSystem raidfs = getRaidFS();
FSDataInputStream stm = raidfs.open(file);
- // Test that readFully returns.
- stm.readFully(filebytes, 0, (int)stat.getLen());
-
+ stm.readFully(0, filebytes);
+ assertEquals(crc, bufferCRC(filebytes));
+ stm.close();
+
+ // Generate parity.
+ RaidNode.doRaid(conf, fileSys.getFileStatus(file),
+ new Path("/destraid"), code, new RaidNode.Statistics(),
+ new RaidUtils.DummyProgressable(),
+ false, 1, 1, stripeLength);
+ int[] corrupt = {0, 4, 7}; // first, last and middle block
+ for (int blockIdx : corrupt) {
+ LOG.info("Corrupt block " + blockIdx + " of file " + file);
+ LocatedBlocks locations = getBlockLocations(file);
+ corruptBlock(file, locations.get(blockIdx).getBlock(),
+ NUM_DATANODES, true);
+ }
+ // Test that readFully returns the correct CRC when there are errors.
stm = raidfs.open(file);
- // Test that readFully returns.
- stm.readFully(filebytes);
+ stm.readFully(0, filebytes);
+ assertEquals(crc, bufferCRC(filebytes));
} finally {
myTearDown();
}
@@ -309,24 +281,23 @@ public class TestRaidDfs extends TestCas
public void testAccessTime() throws Exception {
LOG.info("Test testAccessTime started.");
+ code = ErasureCodeType.XOR;
long blockSize = 8192L;
int numBlocks = 8;
int repl = 1;
- mySetup("xor", 3, 1);
+ stripeLength = 3;
+ mySetup("xor", 1);
Path file = new Path("/user/dhruba/raidtest/file");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
createTestFilePartialLastBlock(fileSys, file, repl, numBlocks, blockSize);
FileStatus stat = fileSys.getFileStatus(file);
- int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
try {
- // Create an instance of the RaidNode
- Configuration localConf = new Configuration(conf);
- localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
- cnode = RaidNode.createRaidNode(null, localConf);
+ RaidNode.doRaid(conf, fileSys.getFileStatus(file),
+ new Path("/destraid"), code, new RaidNode.Statistics(),
+ new RaidUtils.DummyProgressable(), false, repl, repl, stripeLength);
- waitForFileRaided(LOG, fileSys, file, destPath);
FileStatus newStat = fileSys.getFileStatus(file);
assertEquals(stat.getModificationTime(), newStat.getModificationTime());
@@ -335,6 +306,7 @@ public class TestRaidDfs extends TestCas
myTearDown();
}
}
+
/**
* Create a file, corrupt a block in it and ensure that the file can be
* read through DistributedRaidFileSystem by XOR code.
@@ -342,15 +314,11 @@ public class TestRaidDfs extends TestCas
public void testRaidDfsXor() throws Exception {
LOG.info("Test testRaidDfs started.");
+ code = ErasureCodeType.XOR;
long blockSize = 8192L;
int numBlocks = 8;
- int stripeLength = 3;
- mySetup("xor", stripeLength, 1);
-
- // Create an instance of the RaidNode
- Configuration localConf = new Configuration(conf);
- localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
- cnode = RaidNode.createRaidNode(null, localConf);
+ stripeLength = 3;
+ mySetup("xor", 1);
Path destPath = new Path("/destraid/user/dhruba/raidtest");
int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
@@ -358,14 +326,13 @@ public class TestRaidDfs extends TestCas
for (int i = 0; i < corrupt.length; i++) {
Path file = new Path("/user/dhruba/raidtest/" + i);
corruptBlockAndValidate(
- file, destPath, corrupt[0], blockSize, numBlocks);
+ file, new Path("/destraid"), corrupt[i], blockSize, numBlocks);
}
} catch (Exception e) {
LOG.info("testRaidDfs Exception " + e +
StringUtils.stringifyException(e));
throw e;
} finally {
- if (cnode != null) { cnode.stop(); cnode.join(); }
myTearDown();
}
LOG.info("Test testRaidDfs completed.");
@@ -421,6 +388,13 @@ public class TestRaidDfs extends TestCas
stm.close();
return crc.getValue();
}
+
+ static long bufferCRC(byte[] buf) {
+ CRC32 crc = new CRC32();
+ crc.update(buf, 0, buf.length);
+ return crc.getValue();
+ }
+
//
// validates that file matches the crc.
//