You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sc...@apache.org on 2010/11/12 01:17:23 UTC
svn commit: r1034221 [1/2] - in /hadoop/mapreduce/trunk: ./
src/contrib/raid/src/java/org/apache/hadoop/hdfs/
src/contrib/raid/src/java/org/apache/hadoop/raid/
src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/
src/contrib/raid/src/test/org/apa...
Author: schen
Date: Fri Nov 12 00:17:22 2010
New Revision: 1034221
URL: http://svn.apache.org/viewvc?rev=1034221&view=rev
Log:
MAPREDUCE-2169. Integrated Reed-Solomon code with RaidNode. (Ramkumar Vadali via schen)
Added:
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonDecoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestReedSolomonEncoder.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestBlockFixer.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestDirectoryTraversal.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidFilter.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidNode.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidPurge.java
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidShell.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Nov 12 00:17:22 2010
@@ -21,6 +21,9 @@ Trunk (unreleased changes)
MAPREDUCE-1970. Reed-Solomon code implementation for HDFS RAID.
(Scott Chen via dhruba)
+ MAPREDUCE-2169. Integrated Reed-Solomon code with RaidNode. (Ramkumar
+ Vadali via schen)
+
IMPROVEMENTS
MAPREDUCE-2141. Add an "extra data" field to Task for use by Mesos. (matei)
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/hdfs/DistributedRaidFileSystem.java Fri Nov 12 00:17:22 2010
@@ -20,26 +20,33 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.DataInput;
+import java.io.PrintStream;
import java.net.URI;
import java.net.URISyntaxException;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;
+import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.hdfs.BlockMissingException;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.raid.Decoder;
import org.apache.hadoop.raid.RaidNode;
+import org.apache.hadoop.raid.ReedSolomonDecoder;
import org.apache.hadoop.raid.XORDecoder;
-import org.apache.hadoop.hdfs.BlockMissingException;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
/**
* This is an implementation of the Hadoop RAID Filesystem. This FileSystem
@@ -51,7 +58,7 @@ import org.apache.hadoop.hdfs.Distribute
public class DistributedRaidFileSystem extends FilterFileSystem {
// these are alternate locations that can be used for read-only access
- Path[] alternates;
+ DecodeInfo[] alternates;
Configuration conf;
int stripeLength;
@@ -64,6 +71,30 @@ public class DistributedRaidFileSystem e
stripeLength = 0;
}
+ // Information required for decoding a source file
+ static private class DecodeInfo {
+ final Path destPath;
+ final ErasureCodeType type;
+ final Configuration conf;
+ final int stripeLength;
+ private DecodeInfo(Configuration conf, ErasureCodeType type, Path destPath) {
+ this.conf = conf;
+ this.type = type;
+ this.destPath = destPath;
+ this.stripeLength = RaidNode.getStripeLength(conf);
+ }
+
+ Decoder createDecoder() {
+ if (this.type == ErasureCodeType.XOR) {
+ return new XORDecoder(conf, stripeLength);
+ } else if (this.type == ErasureCodeType.RS) {
+ return new ReedSolomonDecoder(conf, stripeLength,
+ RaidNode.rsParityLength(conf));
+ }
+ return null;
+ }
+ }
+
/* Initialize a Raid FileSystem
*/
public void initialize(URI name, Configuration conf) throws IOException {
@@ -78,23 +109,6 @@ public class DistributedRaidFileSystem e
this.fs = (FileSystem)ReflectionUtils.newInstance(clazz, null);
super.initialize(name, conf);
- String alt = conf.get("hdfs.raid.locations");
-
- // If no alternates are specified, then behave absolutely same as
- // the original file system.
- if (alt == null || alt.length() == 0) {
- LOG.info("hdfs.raid.locations not defined. Using defaults...");
- alt = RaidNode.DEFAULT_RAID_LOCATION;
- }
-
- // fs.alternate.filesystem.prefix can be of the form:
- // "hdfs://host:port/myPrefixPath, file:///localPrefix,hftp://host1:port1/"
- String[] strs = alt.split(",");
- if (strs == null || strs.length == 0) {
- LOG.info("hdfs.raid.locations badly defined. Ignoring...");
- return;
- }
-
// find stripe length configured
stripeLength = RaidNode.getStripeLength(conf);
if (stripeLength == 0) {
@@ -103,12 +117,12 @@ public class DistributedRaidFileSystem e
return;
}
- // create a reference to all underlying alternate path prefix
- alternates = new Path[strs.length];
- for (int i = 0; i < strs.length; i++) {
- alternates[i] = new Path(strs[i].trim());
- alternates[i] = alternates[i].makeQualified(fs);
- }
+ // Put XOR and RS in alternates
+ alternates= new DecodeInfo[2];
+ Path xorPath = RaidNode.xorDestinationPath(conf, fs);
+ alternates[0] = new DecodeInfo(conf, ErasureCodeType.XOR, xorPath);
+ Path rsPath = RaidNode.rsDestinationPath(conf, fs);
+ alternates[1] = new DecodeInfo(conf, ErasureCodeType.RS, rsPath);
}
/*
@@ -153,13 +167,13 @@ public class DistributedRaidFileSystem e
private int nextLocation;
private DistributedRaidFileSystem lfs;
private Path path;
- private final Path[] alternates;
+ private final DecodeInfo[] alternates;
private final int buffersize;
private final Configuration conf;
private final int stripeLength;
- ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs, Path[] alternates,
- Path path, int stripeLength, int buffersize)
+ ExtFsInputStream(Configuration conf, DistributedRaidFileSystem lfs,
+ DecodeInfo[] alternates, Path path, int stripeLength, int buffersize)
throws IOException {
this.underLyingStream = lfs.fs.open(path, buffersize);
this.path = path;
@@ -345,10 +359,13 @@ public class DistributedRaidFileSystem e
clientConf.set("fs.hdfs.impl", clazz.getName());
// Disable caching so that a previously cached RaidDfs is not used.
clientConf.setBoolean("fs.hdfs.impl.disable.cache", true);
- Decoder decoder =
- new XORDecoder(clientConf, RaidNode.getStripeLength(clientConf));
- Path npath = RaidNode.unRaid(clientConf, path, alternates[idx],
- decoder, stripeLength, corruptOffset);
+ Path npath = RaidNode.unRaid(clientConf, path,
+ alternates[idx].destPath,
+ alternates[idx].createDecoder(),
+ stripeLength, corruptOffset);
+ if (npath == null)
+ continue;
+
FileSystem fs1 = getUnderlyingFileSystem(conf);
fs1.initialize(npath.toUri(), conf);
LOG.info("Opening alternate path " + npath + " at offset " + curpos);
@@ -392,7 +409,7 @@ public class DistributedRaidFileSystem e
* @throws IOException
*/
public ExtFSDataInputStream(Configuration conf, DistributedRaidFileSystem lfs,
- Path[] alternates, Path p, int stripeLength, int buffersize) throws IOException {
+ DecodeInfo[] alternates, Path p, int stripeLength, int buffersize) throws IOException {
super(new ExtFsInputStream(conf, lfs, alternates, p, stripeLength, buffersize));
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/BlockFixer.java Fri Nov 12 00:17:22 2010
@@ -18,7 +18,6 @@
package org.apache.hadoop.raid;
-import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
@@ -60,6 +59,7 @@ import org.apache.hadoop.io.Text;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
@@ -71,36 +71,73 @@ import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.raid.RaidNode;
import org.apache.hadoop.raid.RaidUtils;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
+
/**
- * This class fixes source file blocks using the parity file,
- * and parity file blocks using the source file.
- * It periodically fetches the list of corrupt files from the namenode,
- * and figures out the location of the bad block by reading through
- * the corrupt file.
+ * contains the core functionality of the block fixer
+ *
+ * raid.blockfix.interval - interval between checks for corrupt files
+ *
+ * raid.blockfix.history.interval - interval before fixing same file again
+ *
+ * raid.blockfix.read.timeout - read time out
+ *
+ * raid.blockfix.write.timeout - write time out
*/
-public class BlockFixer implements Runnable {
+public class BlockFixer extends Configured implements Runnable {
public static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.raid.BlockFixer");
+
+ public static final String BLOCKFIX_INTERVAL = "raid.blockfix.interval";
+ public static final String BLOCKFIX_HISTORY_INTERVAL =
+ "raid.blockfix.history.interval";
+ public static final String BLOCKFIX_READ_TIMEOUT =
+ "raid.blockfix.read.timeout";
+ public static final String BLOCKFIX_WRITE_TIMEOUT =
+ "raid.blockfix.write.timeout";
+
+ public static final long DEFAULT_BLOCKFIX_INTERVAL = 60 * 1000; // 1 min
+ public static final long DEFAULT_BLOCKFIX_HISTORY_INTERVAL =
+ 60 * 60 * 1000; // 60 mins
+
private java.util.HashMap<String, java.util.Date> history;
- private int blockFixInterval = 60*1000; // 1min
private long numFilesFixed = 0;
- private Configuration conf;
private String xorPrefix;
- private XOREncoder xorEncoder;
- private XORDecoder xorDecoder;
+ private String rsPrefix;
+ private Encoder xorEncoder;
+ private Decoder xorDecoder;
+ private Encoder rsEncoder;
+ private Decoder rsDecoder;
+
+ // interval between checks for corrupt files
+ protected long blockFixInterval = DEFAULT_BLOCKFIX_INTERVAL;
+
+ // interval before fixing same file again
+ protected long historyInterval = DEFAULT_BLOCKFIX_HISTORY_INTERVAL;
+
+ public volatile boolean running = true;
- boolean running = true;
public BlockFixer(Configuration conf) throws IOException {
- this.conf = conf;
+ super(conf);
history = new java.util.HashMap<String, java.util.Date>();
- blockFixInterval = conf.getInt("raid.blockfix.interval",
- blockFixInterval);
- xorPrefix = RaidNode.getDestinationPath(conf).toUri().getPath();
- int stripeLength = RaidNode.getStripeLength(conf);
- xorEncoder = new XOREncoder(conf, stripeLength);
- xorDecoder = new XORDecoder(conf, stripeLength);
+ blockFixInterval = getConf().getInt(BLOCKFIX_INTERVAL,
+ (int) blockFixInterval);
+ xorPrefix = RaidNode.xorDestinationPath(getConf()).toUri().getPath();
+ if (!xorPrefix.endsWith(Path.SEPARATOR)) {
+ xorPrefix += Path.SEPARATOR;
+ }
+ int stripeLength = RaidNode.getStripeLength(getConf());
+ xorEncoder = new XOREncoder(getConf(), stripeLength);
+ xorDecoder = new XORDecoder(getConf(), stripeLength);
+ rsPrefix = RaidNode.rsDestinationPath(getConf()).toUri().getPath();
+ if (!rsPrefix.endsWith(Path.SEPARATOR)) {
+ rsPrefix += Path.SEPARATOR;
+ }
+ int parityLength = RaidNode.rsParityLength(getConf());
+ rsEncoder = new ReedSolomonEncoder(getConf(), stripeLength, parityLength);
+ rsDecoder = new ReedSolomonDecoder(getConf(), stripeLength, parityLength);
}
public void run() {
@@ -154,6 +191,7 @@ public class BlockFixer implements Runna
void fixFile(Path srcPath) throws IOException {
+
if (RaidNode.isParityHarPartFile(srcPath)) {
processCorruptParityHarPartFile(srcPath);
return;
@@ -165,24 +203,30 @@ public class BlockFixer implements Runna
return;
}
- // The corrupted file is a source file
+ // The corrupted file is a ReedSolomon parity file
+ if (isRsParityFile(srcPath)) {
+ processCorruptParityFile(srcPath, rsEncoder);
+ return;
+ }
- // Do we have a parity file for this file?
- RaidNode.ParityFilePair ppair = null;
+ // The corrupted file is a source file
+ RaidNode.ParityFilePair ppair =
+ RaidNode.xorParityForSource(srcPath, getConf());
Decoder decoder = null;
- Path destPath = null;
- try {
- destPath = RaidNode.getDestinationPath(conf);
- ppair = RaidNode.getParityFile(destPath, srcPath, conf);
+ if (ppair != null) {
+ decoder = xorDecoder;
+ } else {
+ ppair = RaidNode.rsParityForSource(srcPath, getConf());
if (ppair != null) {
- decoder = xorDecoder;
+ decoder = rsDecoder;
}
- } catch (FileNotFoundException e) {
}
+
// If we have a parity file, process the file and fix it.
if (ppair != null) {
- processCorruptFile(srcPath, destPath, decoder);
+ processCorruptFile(srcPath, ppair, decoder);
}
+
}
/**
@@ -193,8 +237,8 @@ public class BlockFixer implements Runna
*/
void purgeHistory() {
// Default history interval is 1 hour.
- long historyInterval = conf.getLong(
- "raid.blockfix.history.interval", 3600*1000);
+ long historyInterval = getConf().getLong(
+ BLOCKFIX_HISTORY_INTERVAL, 3600*1000);
java.util.Date cutOff = new java.util.Date(
System.currentTimeMillis()-historyInterval);
List<String> toRemove = new java.util.ArrayList<String>();
@@ -217,14 +261,14 @@ public class BlockFixer implements Runna
List<Path> getCorruptFiles() throws IOException {
DistributedFileSystem dfs = getDFS(new Path("/"));
- String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(conf);
+ String[] nnCorruptFiles = RaidDFSUtil.getCorruptFiles(getConf());
List<Path> corruptFiles = new LinkedList<Path>();
for (String file: nnCorruptFiles) {
if (!history.containsKey(file)) {
corruptFiles.add(new Path(file));
}
}
- RaidUtils.filterTrash(conf, corruptFiles);
+ RaidUtils.filterTrash(getConf(), corruptFiles);
return corruptFiles;
}
@@ -235,11 +279,11 @@ public class BlockFixer implements Runna
// TODO: We should first fix the files that lose more blocks
Comparator<Path> comp = new Comparator<Path>() {
public int compare(Path p1, Path p2) {
- if (isXorParityFile(p2)) {
+ if (isXorParityFile(p2) || isRsParityFile(p2)) {
// If p2 is a parity file, p1 is smaller.
return -1;
}
- if (isXorParityFile(p1)) {
+ if (isXorParityFile(p1) || isRsParityFile(p1)) {
// If p1 is a parity file, p2 is smaller.
return 1;
}
@@ -250,21 +294,13 @@ public class BlockFixer implements Runna
Collections.sort(files, comp);
}
-
- /**
- * Returns a DistributedFileSystem hosting the path supplied.
- */
- private DistributedFileSystem getDFS(Path p) throws IOException {
- return (DistributedFileSystem) p.getFileSystem(conf);
- }
-
/**
* Reads through a corrupt source file fixing corrupt blocks on the way.
* @param srcPath Path identifying the corrupt file.
* @throws IOException
*/
- void processCorruptFile(Path srcPath, Path destPath, Decoder decoder)
- throws IOException {
+ void processCorruptFile(Path srcPath, RaidNode.ParityFilePair parityPair,
+ Decoder decoder) throws IOException {
LOG.info("Processing corrupt file " + srcPath);
DistributedFileSystem srcFs = getDFS(srcPath);
@@ -290,9 +326,6 @@ public class BlockFixer implements Runna
localBlockFile.deleteOnExit();
try {
- RaidNode.ParityFilePair parityPair = RaidNode.getParityFile(
- destPath, srcPath, conf);
-
decoder.recoverBlockToFile(srcFs, srcPath, parityPair.getFileSystem(),
parityPair.getPath(), blockSize, corruptOffset, localBlockFile,
blockContentsSize);
@@ -313,6 +346,9 @@ public class BlockFixer implements Runna
numFilesFixed++;
}
+ /**
+ * checks whether file is xor parity file
+ */
boolean isXorParityFile(Path p) {
String pathStr = p.toUri().getPath();
if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
@@ -322,7 +358,25 @@ public class BlockFixer implements Runna
}
/**
- * Reads through a parity file, fixing corrupt blocks on the way.
+ * checks whether file is rs parity file
+ */
+ boolean isRsParityFile(Path p) {
+ String pathStr = p.toUri().getPath();
+ if (pathStr.contains(RaidNode.HAR_SUFFIX)) {
+ return false;
+ }
+ return pathStr.startsWith(rsPrefix);
+ }
+
+ /**
+ * Returns a DistributedFileSystem hosting the path supplied.
+ */
+ protected DistributedFileSystem getDFS(Path p) throws IOException {
+ return (DistributedFileSystem) p.getFileSystem(getConf());
+ }
+
+ /**
+ * Fixes corrupt blocks in a parity file.
* This function uses the corresponding source file to regenerate parity
* file blocks.
*/
@@ -469,6 +523,8 @@ public class BlockFixer implements Runna
Encoder encoder;
if (isXorParityFile(parityFile)) {
encoder = xorEncoder;
+ } else if (isRsParityFile(parityFile)) {
+ encoder = rsEncoder;
} else {
String msg = "Could not figure out parity file correctly";
LOG.warn(msg);
@@ -605,7 +661,7 @@ public class BlockFixer implements Runna
DataInputStream blockMetadata = null;
try {
blockContents = new FileInputStream(localBlockFile);
- blockMetadata = computeMetadata(conf, blockContents);
+ blockMetadata = computeMetadata(getConf(), blockContents);
blockContents.close();
// Reopen
blockContents = new FileInputStream(localBlockFile);
@@ -638,13 +694,13 @@ public class BlockFixer implements Runna
InetSocketAddress target = NetUtils.createSocketAddr(datanode.name);
Socket sock = SocketChannel.open().socket();
- int readTimeout = conf.getInt(
- "raid.blockfix.read.timeout", HdfsConstants.READ_TIMEOUT);
+ int readTimeout = getConf().getInt(BLOCKFIX_READ_TIMEOUT,
+ HdfsConstants.READ_TIMEOUT);
NetUtils.connect(sock, target, readTimeout);
sock.setSoTimeout(readTimeout);
- int writeTimeout = conf.getInt(
- "raid.blockfix.write.timeout", HdfsConstants.WRITE_TIMEOUT);
+ int writeTimeout = getConf().getInt(BLOCKFIX_WRITE_TIMEOUT,
+ HdfsConstants.WRITE_TIMEOUT);
OutputStream baseStream = NetUtils.getOutputStream(sock, writeTimeout);
DataOutputStream out = new DataOutputStream(
@@ -685,11 +741,18 @@ public class BlockFixer implements Runna
}
}
+ /**
+ * returns the source file corresponding to a parity file
+ */
Path sourcePathFromParityPath(Path parityPath) {
String parityPathStr = parityPath.toUri().getPath();
if (parityPathStr.startsWith(xorPrefix)) {
// Remove the prefix to get the source file.
- String src = parityPathStr.replaceFirst(xorPrefix, "");
+ String src = parityPathStr.replaceFirst(xorPrefix, "/");
+ return new Path(src);
+ } else if (parityPathStr.startsWith(rsPrefix)) {
+ // Remove the prefix to get the source file.
+ String src = parityPathStr.replaceFirst(rsPrefix, "/");
return new Path(src);
}
return null;
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ConfigManager.java Fri Nov 12 00:17:22 2010
@@ -22,7 +22,9 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -146,8 +148,9 @@ class ConfigManager {
*
<configuration>
<srcPath prefix="hdfs://hadoop.myhost.com:9000/user/warehouse/u_full/*">
- <destPath> hdfs://dfsarch.data.facebook.com:9000/archive/</destPath>
<policy name = RaidScanWeekly>
+ <destPath> hdfs://dfsname.myhost.com:9000/archive/</destPath>
+ <parentPolicy> RaidScanMonthly</parentPolicy>
<property>
<name>targetReplication</name>
<value>2</value>
@@ -220,6 +223,8 @@ class ConfigManager {
"top-level element not <configuration>");
NodeList elements = root.getChildNodes();
+ Map<String, PolicyInfo> existingPolicies =
+ new HashMap<String, PolicyInfo>();
// loop through all the configured source paths.
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
@@ -231,15 +236,15 @@ class ConfigManager {
if ("srcPath".equalsIgnoreCase(elementTagName)) {
String srcPathPrefix = element.getAttribute("prefix");
- if (srcPathPrefix == null || srcPathPrefix.length() == 0) {
- throw new RaidConfigurationException("Bad configuration file: " +
- "srcPathPrefix not set.");
+ PolicyList policyList = null;
+ if (srcPathPrefix != null && srcPathPrefix.length() != 0) {
+ // Empty srcPath will have no effect but policies will be processed
+ // This allow us to define some "abstract" policies
+ policyList = new PolicyList();
+ all.add(policyList);
+ policyList.setSrcPath(conf, srcPathPrefix);
}
- PolicyList policyList = new PolicyList();
- all.add(policyList);
- policyList.setSrcPath(conf, srcPathPrefix);
-
// loop through all the policies for this source path
NodeList policies = element.getChildNodes();
for (int j = 0; j < policies.getLength(); j++) {
@@ -253,12 +258,13 @@ class ConfigManager {
"Expecting <policy> for srcPath " + srcPathPrefix);
}
String policyName = policy.getAttribute("name");
- PolicyInfo pinfo = new PolicyInfo(policyName, conf);
- pinfo.setSrcPath(srcPathPrefix);
- policyList.add(pinfo);
-
+ PolicyInfo curr = new PolicyInfo(policyName, conf);
+ if (srcPathPrefix != null && srcPathPrefix.length() > 0) {
+ curr.setSrcPath(srcPathPrefix);
+ }
// loop through all the properties of this policy
NodeList properties = policy.getChildNodes();
+ PolicyInfo parent = null;
for (int k = 0; k < properties.getLength(); k++) {
Node node2 = properties.item(k);
if (!(node2 instanceof Element)) {
@@ -266,13 +272,16 @@ class ConfigManager {
}
Element property = (Element)node2;
String propertyName = property.getTagName();
- if ("destPath".equalsIgnoreCase(propertyName)) {
+ if ("erasureCode".equalsIgnoreCase(propertyName)) {
String text = ((Text)property.getFirstChild()).getData().trim();
- LOG.info(policyName + ".destPath = " + text);
- pinfo.setDestinationPath(text);
+ LOG.info(policyName + ".erasureCode = " + text);
+ curr.setErasureCode(text);
} else if ("description".equalsIgnoreCase(propertyName)) {
String text = ((Text)property.getFirstChild()).getData().trim();
- pinfo.setDescription(text);
+ curr.setDescription(text);
+ } else if ("parentPolicy".equalsIgnoreCase(propertyName)) {
+ String text = ((Text)property.getFirstChild()).getData().trim();
+ parent = existingPolicies.get(text);
} else if ("property".equalsIgnoreCase(propertyName)) {
NodeList nl = property.getChildNodes();
String pname=null,pvalue=null;
@@ -291,7 +300,7 @@ class ConfigManager {
}
if (pname != null && pvalue != null) {
LOG.info(policyName + "." + pname + " = " + pvalue);
- pinfo.setProperty(pname,pvalue);
+ curr.setProperty(pname,pvalue);
}
} else {
LOG.info("Found bad property " + propertyName +
@@ -300,6 +309,20 @@ class ConfigManager {
". Ignoring.");
}
} // done with all properties of this policy
+
+ PolicyInfo pinfo;
+ if (parent != null) {
+ pinfo = new PolicyInfo(policyName, conf);
+ pinfo.copyFrom(parent);
+ pinfo.copyFrom(curr);
+ } else {
+ pinfo = curr;
+ }
+ if (policyList != null) {
+ policyList.add(pinfo);
+ }
+ existingPolicies.put(policyName, pinfo);
+
} // done with all policies for this srcpath
}
} // done with all srcPaths
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/DistRaid.java Fri Nov 12 00:17:22 2010
@@ -335,9 +335,6 @@ public class DistRaid {
LOG.info("Job Complete(Failed): " + jobID);
}
raidPolicyPathPairList.clear();
- Counters ctrs = runningJob.getCounters();
- long filesRaided = ctrs.findCounter(Counter.FILES_SUCCEEDED).getValue();
- long filesFailed = ctrs.findCounter(Counter.FILES_FAILED).getValue();
return true;
} else {
String report = (" job " + jobID +
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/Encoder.java Fri Nov 12 00:17:22 2010
@@ -107,7 +107,8 @@ public abstract class Encoder {
* @param srcFile The source file.
* @param parityFile The parity file to be generated.
*/
- public void encodeFile(FileSystem fs, Path srcFile, Path parityFile,
+ public void encodeFile(
+ FileSystem fs, Path srcFile, FileSystem parityFs, Path parityFile,
short parityRepl, Progressable reporter) throws IOException {
FileStatus srcStat = fs.getFileStatus(srcFile);
long srcSize = srcStat.getLen();
@@ -116,10 +117,13 @@ public abstract class Encoder {
configureBuffers(blockSize);
// Create a tmp file to which we will write first.
- Path parityTmp = new Path(conf.get("fs.raid.tmpdir", "/tmp/raid") +
- parityFile.toUri().getPath() +
- "." + rand.nextLong() + ".tmp");
- FSDataOutputStream out = fs.create(
+ Path tmpDir = getParityTempPath();
+ if (!parityFs.mkdirs(tmpDir)) {
+ throw new IOException("Could not create tmp dir " + tmpDir);
+ }
+ Path parityTmp = new Path(tmpDir,
+ parityFile.getName() + rand.nextLong());
+ FSDataOutputStream out = parityFs.create(
parityTmp,
true,
conf.getInt("io.file.buffer.size", 64 * 1024),
@@ -133,11 +137,11 @@ public abstract class Encoder {
LOG.info("Wrote temp parity file " + parityTmp);
// delete destination if exists
- if (fs.exists(parityFile)){
- fs.delete(parityFile, false);
+ if (parityFs.exists(parityFile)){
+ parityFs.delete(parityFile, false);
}
- fs.mkdirs(parityFile.getParent());
- if (!fs.rename(parityTmp, parityFile)) {
+ parityFs.mkdirs(parityFile.getParent());
+ if (!parityFs.rename(parityTmp, parityFile)) {
String msg = "Unable to rename file " + parityTmp + " to " + parityFile;
throw new IOException (msg);
}
@@ -146,7 +150,7 @@ public abstract class Encoder {
if (out != null) {
out.close();
}
- fs.delete(parityTmp, false);
+ parityFs.delete(parityTmp, false);
}
}
@@ -225,7 +229,7 @@ public abstract class Encoder {
LOG.info("Starting recovery by using source stripe " +
srcFile + ":" + stripeStart);
// Read the data from the blocks and write to the parity file.
- encodeStripe(blocks, stripeStart, blockSize, outs,
+ encodeStripe(blocks, stripeStart, blockSize, outs,
new RaidUtils.DummyProgressable());
}
@@ -339,4 +343,9 @@ public abstract class Encoder {
long blockSize,
OutputStream[] outs,
Progressable reporter) throws IOException;
+
+ /**
+ * Return the temp path for the parity file
+ */
+ protected abstract Path getParityTempPath();
}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidFilter.java Fri Nov 12 00:17:22 2010
@@ -218,4 +218,42 @@ public class RaidFilter {
return len;
}
}
+
+ static class PreferenceFilter extends Configured
+ implements DirectoryTraversal.FileFilter {
+ Path firstChoicePrefix;
+ DirectoryTraversal.FileFilter secondChoiceFilter;
+
+ PreferenceFilter(Configuration conf,
+ Path firstChoicePrefix, Path secondChoicePrefix,
+ int targetRepl, long startTime, long modTimePeriod) {
+ super(conf);
+ this.firstChoicePrefix = firstChoicePrefix;
+ this.secondChoiceFilter = new TimeBasedFilter(conf,
+ secondChoicePrefix, targetRepl, startTime, modTimePeriod);
+ }
+
+ PreferenceFilter(Configuration conf,
+ Path firstChoicePrefix, Path secondChoicePrefix,
+ PolicyInfo info, List<PolicyInfo> allPolicies, long startTime,
+ Statistics stats) {
+ super(conf);
+ this.firstChoicePrefix = firstChoicePrefix;
+ this.secondChoiceFilter = new TimeBasedFilter(
+ conf, secondChoicePrefix, info, allPolicies, startTime, stats);
+ }
+
+ public boolean check(FileStatus f) throws IOException {
+ Object firstChoicePPair =
+ RaidNode.getParityFile(firstChoicePrefix, f.getPath(), getConf());
+ if (firstChoicePPair == null) {
+ // The decision is upto the the second choice filter.
+ return secondChoiceFilter.check(f);
+ } else {
+ // There is already a parity file under the first choice path.
+ // We dont want to choose this file.
+ return false;
+ }
+ }
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Fri Nov 12 00:17:22 2010
@@ -20,12 +20,12 @@ package org.apache.hadoop.raid;
import java.io.IOException;
import java.io.FileNotFoundException;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.ArrayList;
import java.util.List;
import java.util.LinkedList;
-import java.util.Iterator;
import java.util.Arrays;
import java.util.Map;
import java.util.Random;
@@ -63,6 +63,7 @@ import org.apache.hadoop.util.Progressab
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
import org.apache.hadoop.raid.protocol.RaidProtocol;
+import org.apache.hadoop.raid.protocol.PolicyInfo.ErasureCodeType;
/**
* A base class that implements {@link RaidProtocol}.
@@ -77,19 +78,35 @@ public abstract class RaidNode implement
Configuration.addDefaultResource("mapred-default.xml");
Configuration.addDefaultResource("mapred-site.xml");
}
-
public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
public static final long SLEEP_TIME = 10000L; // 10 seconds
public static final int DEFAULT_PORT = 60000;
- public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+ // Default stripe length = 5, parity length for RS code = 3
+ public static final int DEFAULT_STRIPE_LENGTH = 5;
+ public static final int RS_PARITY_LENGTH_DEFAULT = 3;
+
+ public static final String RS_PARITY_LENGTH_KEY = "hdfs.raidrs.paritylength";
public static final String STRIPE_LENGTH_KEY = "hdfs.raid.stripeLength";
+
public static final String DEFAULT_RAID_LOCATION = "/raid";
public static final String RAID_LOCATION_KEY = "hdfs.raid.locations";
+ public static final String DEFAULT_RAID_TMP_LOCATION = "/tmp/raid";
+ public static final String RAID_TMP_LOCATION_KEY = "fs.raid.tmpdir";
+ public static final String DEFAULT_RAID_HAR_TMP_LOCATION = "/tmp/raid_har";
+ public static final String RAID_HAR_TMP_LOCATION_KEY = "fs.raid.hartmpdir";
+
+ public static final String DEFAULT_RAIDRS_LOCATION = "/raidrs";
+ public static final String RAIDRS_LOCATION_KEY = "hdfs.raidrs.locations";
+ public static final String DEFAULT_RAIDRS_TMP_LOCATION = "/tmp/raidrs";
+ public static final String RAIDRS_TMP_LOCATION_KEY = "fs.raidrs.tmpdir";
+ public static final String DEFAULT_RAIDRS_HAR_TMP_LOCATION = "/tmp/raidrs_har";
+ public static final String RAIDRS_HAR_TMP_LOCATION_KEY = "fs.raidrs.hartmpdir";
+
public static final String HAR_SUFFIX = "_raid.har";
public static final Pattern PARITY_HAR_PARTFILE_PATTERN =
Pattern.compile(".*" + HAR_SUFFIX + "/part-.*");
-
- public static final String RAIDNODE_CLASSNAME_KEY = "raid.classname";
+
+ public static final String RAIDNODE_CLASSNAME_KEY = "raid.classname";
/** RPC server */
private Server server;
@@ -131,11 +148,11 @@ public abstract class RaidNode implement
long metaSize; // total disk space for meta files
public void clear() {
- numProcessedBlocks = 0;
- processedSize = 0;
- remainingSize = 0;
- numMetaBlocks = 0;
- metaSize = 0;
+ numProcessedBlocks = 0;
+ processedSize = 0;
+ remainingSize = 0;
+ numMetaBlocks = 0;
+ metaSize = 0;
}
public String toString() {
long save = processedSize - (remainingSize + metaSize);
@@ -310,13 +327,24 @@ public abstract class RaidNode implement
// find stripe length from config
int stripeLength = getStripeLength(conf);
- Path destPref = getDestinationPath(conf);
+ // first try decode using XOR code
+ Path destPref = xorDestinationPath(conf);
Decoder decoder = new XORDecoder(conf, RaidNode.getStripeLength(conf));
Path unraided = unRaid(conf, srcPath, destPref, decoder,
stripeLength, corruptOffset);
if (unraided != null) {
return unraided.toString();
}
+
+ // try decode using ReedSolomon code
+ destPref = rsDestinationPath(conf);
+ decoder = new ReedSolomonDecoder(conf, RaidNode.getStripeLength(conf),
+ RaidNode.rsParityLength(conf));
+ unraided = unRaid(conf, srcPath, destPref, decoder,
+ stripeLength, corruptOffset);
+ if (unraided != null) {
+ return unraided.toString();
+ }
return null;
}
@@ -329,7 +357,6 @@ public abstract class RaidNode implement
* Periodically checks to see which policies should be fired.
*/
class TriggerMonitor implements Runnable {
-
class ScanState {
long fullScanStartTime;
DirectoryTraversal pendingTraversal;
@@ -393,7 +420,7 @@ public abstract class RaidNode implement
*/
private List<FileStatus> selectFiles(
PolicyInfo info, ArrayList<PolicyInfo> allPolicies) throws IOException {
- Path destPrefix = getDestinationPath(conf);
+ Path destPrefix = getDestinationPath(info.getErasureCode(), conf);
String policyName = info.getName();
Path srcPath = info.getSrcPath();
long modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
@@ -480,7 +507,6 @@ public abstract class RaidNode implement
}
}
}
-
for (PolicyInfo info: allPolicies) {
if (!scanStateMap.containsKey(info.getName())) {
scanStateMap.put(info.getName(), new ScanState());
@@ -526,8 +552,19 @@ public abstract class RaidNode implement
long startTime, PolicyInfo info, List<PolicyInfo> allPolicies,
RaidFilter.Statistics stats)
throws IOException {
- return new RaidFilter.TimeBasedFilter(conf, getDestinationPath(conf),
- info, allPolicies, startTime, stats);
+ switch (info.getErasureCode()) {
+ case XOR:
+ // Return a preference-based filter that prefers RS parity files
+ // over XOR parity files.
+ return new RaidFilter.PreferenceFilter(
+ conf, rsDestinationPath(conf), xorDestinationPath(conf),
+ info, allPolicies, startTime, stats);
+ case RS:
+ return new RaidFilter.TimeBasedFilter(conf, rsDestinationPath(conf),
+ info, allPolicies, startTime, stats);
+ default:
+ return null;
+ }
}
}
@@ -623,13 +660,31 @@ public abstract class RaidNode implement
return null; // NULL if no parity file
}
-
- private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath) throws IOException {
-
+
+ static ParityFilePair xorParityForSource(Path srcPath, Configuration conf)
+ throws IOException {
+ try {
+ Path destPath = xorDestinationPath(conf);
+ return getParityFile(destPath, srcPath, conf);
+ } catch (FileNotFoundException e) {
+ }
+ return null;
+ }
+
+ static ParityFilePair rsParityForSource(Path srcPath, Configuration conf)
+ throws IOException {
+ try {
+ Path destPath = rsDestinationPath(conf);
+ return getParityFile(destPath, srcPath, conf);
+ } catch (FileNotFoundException e) {
+ }
+ return null;
+ }
+
+ private ParityFilePair getParityFile(Path destPathPrefix, Path srcPath)
+ throws IOException {
return getParityFile(destPathPrefix, srcPath, conf);
-
}
-
/**
* RAID a list of files.
@@ -639,7 +694,7 @@ public abstract class RaidNode implement
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
int stripeLength = getStripeLength(conf);
- Path destPref = getDestinationPath(conf);
+ Path destPref = getDestinationPath(info.getErasureCode(), conf);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
@@ -648,8 +703,9 @@ public abstract class RaidNode implement
int count = 0;
for (FileStatus s : paths) {
- doRaid(conf, s, destPref, statistics, new RaidUtils.DummyProgressable(),
- doSimulate, targetRepl, metaRepl, stripeLength);
+ doRaid(conf, s, destPref, info.getErasureCode(), statistics,
+ new RaidUtils.DummyProgressable(), doSimulate, targetRepl, metaRepl,
+ stripeLength);
if (count % 1000 == 0) {
LOG.info("RAID statistics " + statistics.toString());
}
@@ -664,25 +720,28 @@ public abstract class RaidNode implement
*/
static public void doRaid(Configuration conf, PolicyInfo info,
- FileStatus src, Statistics statistics, Progressable reporter) throws IOException {
+ FileStatus src, Statistics statistics, Progressable reporter)
+ throws IOException {
int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
int stripeLength = getStripeLength(conf);
- Path destPref = getDestinationPath(conf);
+
+ Path destPref = getDestinationPath(info.getErasureCode(), conf);
String simulate = info.getProperty("simulate");
boolean doSimulate = simulate == null ? false : Boolean
.parseBoolean(simulate);
- doRaid(conf, src, destPref, statistics, reporter, doSimulate,
- targetRepl, metaRepl, stripeLength);
+ doRaid(conf, src, destPref, info.getErasureCode(), statistics,
+ reporter, doSimulate, targetRepl, metaRepl, stripeLength);
}
/**
* RAID an individual file
*/
static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
- Statistics statistics, Progressable reporter, boolean doSimulate,
- int targetRepl, int metaRepl, int stripeLength)
+ PolicyInfo.ErasureCodeType code, Statistics statistics,
+ Progressable reporter, boolean doSimulate,
+ int targetRepl, int metaRepl, int stripeLength)
throws IOException {
Path p = stat.getPath();
FileSystem srcFs = p.getFileSystem(conf);
@@ -704,7 +763,8 @@ public abstract class RaidNode implement
statistics.processedSize += diskSpace;
// generate parity file
- generateParityFile(conf, stat, reporter, srcFs, destPath, locations, metaRepl, stripeLength);
+ generateParityFile(conf, stat, reporter, srcFs, destPath, code,
+ locations, metaRepl, stripeLength);
// reduce the replication factor of the source file
if (!doSimulate) {
@@ -740,7 +800,9 @@ public abstract class RaidNode implement
static private void generateParityFile(Configuration conf, FileStatus stat,
Progressable reporter,
FileSystem inFs,
- Path destPathPrefix, BlockLocation[] locations,
+ Path destPathPrefix,
+ ErasureCodeType code,
+ BlockLocation[] locations,
int metaRepl, int stripeLength) throws IOException {
Path inpath = stat.getPath();
@@ -751,16 +813,16 @@ public abstract class RaidNode implement
try {
FileStatus stmp = outFs.getFileStatus(outpath);
if (stmp.getModificationTime() == stat.getModificationTime()) {
- LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath +
- " already upto-date. Nothing more to do.");
+ LOG.info("Parity file for " + inpath + "(" + locations.length +
+ ") is " + outpath + " already upto-date. Nothing more to do.");
return;
}
} catch (IOException e) {
// ignore errors because the raid file might not exist yet.
}
- XOREncoder encoder = new XOREncoder(conf, stripeLength);
- encoder.encodeFile(inFs, inpath, outpath, (short)metaRepl, reporter);
+ Encoder encoder = encoderForCode(conf, code);
+ encoder.encodeFile(inFs, inpath, outFs, outpath, (short)metaRepl, reporter);
// set the modification time of the RAID file. This is done so that the modTime of the
// RAID file reflects that contents of the source file that it has RAIDed. This should
@@ -792,8 +854,8 @@ public abstract class RaidNode implement
return null;
}
- final Path recoveryDestination =
- new Path(conf.get("fs.raid.tmpdir", "/tmp/raid"));
+ final Path recoveryDestination = new Path(
+ RaidNode.unraidTmpDirectory(conf));
FileSystem destFs = recoveryDestination.getFileSystem(conf);
final Path recoveredPrefix =
destFs.makeQualified(new Path(recoveryDestination, makeRelative(srcPath)));
@@ -827,6 +889,43 @@ public abstract class RaidNode implement
}
/**
+ * Traverse the parity destination directory, removing directories that
+ * no longer existing in the source.
+ * @throws IOException
+ */
+ private void purgeDirectories(FileSystem fs, Path root) throws IOException {
+ String prefix = root.toUri().getPath();
+ List<FileStatus> startPaths = new LinkedList<FileStatus>();
+ try {
+ startPaths.add(fs.getFileStatus(root));
+ } catch (FileNotFoundException e) {
+ return;
+ }
+ DirectoryTraversal dt = new DirectoryTraversal(fs, startPaths);
+ FileStatus dir = dt.getNextDirectory();
+ for (; dir != null; dir = dt.getNextDirectory()) {
+ Path dirPath = dir.getPath();
+ if (dirPath.toUri().getPath().endsWith(HAR_SUFFIX)) {
+ continue;
+ }
+ String dirStr = dirPath.toUri().getPath();
+ if (!dirStr.startsWith(prefix)) {
+ continue;
+ }
+ String src = dirStr.replaceFirst(prefix, "");
+ if (src.length() == 0) continue;
+ Path srcPath = new Path(src);
+ if (!fs.exists(srcPath)) {
+ LOG.info("Purging directory " + dirPath);
+ boolean done = fs.delete(dirPath, true);
+ if (!done) {
+ LOG.error("Could not purge " + dirPath);
+ }
+ }
+ }
+ }
+
+ /**
* Delete orphaned files. The reason this is done by a separate thread
* is to not burden the TriggerMonitor with scanning the
* destination directories.
@@ -844,13 +943,23 @@ public abstract class RaidNode implement
LOG.info("Started purge scan");
prevExec = now();
+ // expand destination prefix path
+ Path destPref = xorDestinationPath(conf);
+ FileSystem destFs = destPref.getFileSystem(conf);
+ purgeDirectories(destFs, destPref);
+
+ destPref = rsDestinationPath(conf);
+ destFs = destPref.getFileSystem(conf);
+ purgeDirectories(destFs, destPref);
+
+ // fetch all categories
for (PolicyList category : configMgr.getAllPolicies()) {
for (PolicyInfo info: category.getAll()) {
try {
// expand destination prefix path
- Path destPref = getDestinationPath(conf);
- FileSystem destFs = destPref.getFileSystem(conf);
+ destPref = getDestinationPath(info.getErasureCode(), conf);
+ destFs = destPref.getFileSystem(conf);
//get srcPaths
Path[] srcPaths = info.getSrcPathExpanded();
@@ -870,7 +979,8 @@ public abstract class RaidNode implement
if (stat != null) {
LOG.info("Purging obsolete parity files for policy " +
info.getName() + " " + destPath);
- recursePurge(srcFs, destFs, destPref.toUri().getPath(), stat);
+ recursePurge(info.getErasureCode(), srcFs, destFs,
+ destPref.toUri().getPath(), stat);
}
}
@@ -889,7 +999,8 @@ public abstract class RaidNode implement
* The destPrefix is the absolute pathname of the destinationPath
* specified in the policy (without the host:port)
*/
- private void recursePurge(FileSystem srcFs, FileSystem destFs,
+ void recursePurge(ErasureCodeType code,
+ FileSystem srcFs, FileSystem destFs,
String destPrefix, FileStatus dest)
throws IOException {
@@ -901,7 +1012,7 @@ public abstract class RaidNode implement
if (dest.isDirectory() && destStr.endsWith(HAR_SUFFIX)) {
try {
int harUsedPercent =
- usefulHar(srcFs, destFs, destPath, destPrefix, conf);
+ usefulHar(code, srcFs, destFs, destPath, destPrefix, conf);
LOG.info("Useful percentage of " + destStr + " " + harUsedPercent);
// Delete the har if its usefulness reaches a threshold.
if (harUsedPercent <= conf.getInt("raid.har.usage.threshold", 0)) {
@@ -940,7 +1051,7 @@ public abstract class RaidNode implement
}
if (files != null) {
for (FileStatus one:files) {
- recursePurge(srcFs, destFs, destPrefix, one);
+ recursePurge(code, srcFs, destFs, destPrefix, one);
}
}
// If the directory is empty now, it will be purged the next time this
@@ -950,12 +1061,40 @@ public abstract class RaidNode implement
String src = destStr.replaceFirst(destPrefix, "");
- // if the source path does not exist or the parity file has been HARed,
- // then delete the parity file
Path srcPath = new Path(src);
- Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
- if (!srcFs.exists(srcPath) ||
- !destPath.equals(getParityFile(dstPath,srcPath).getPath())) {
+ boolean shouldDelete = false;
+
+ if (!srcFs.exists(srcPath)) {
+ shouldDelete = true;
+ } else {
+ try {
+ // If there is a RS parity file, the XOR parity can be deleted.
+ if (code == ErasureCodeType.XOR) {
+ ParityFilePair ppair = getParityFile(
+ getDestinationPath(ErasureCodeType.RS, conf), srcPath, conf);
+ if (ppair != null) {
+ shouldDelete = true;
+ }
+ }
+ if (!shouldDelete) {
+ Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
+ ParityFilePair ppair = getParityFile(dstPath,srcPath);
+ // If the parity file is not the appropriate one for the source or
+ // the parityFs is not the same as this file's filesystem
+ // (it is a HAR), this file can be deleted.
+ if ( ppair == null ||
+ !destFs.equals(ppair.getFileSystem()) ||
+ !destPath.equals(ppair.getPath())) {
+ shouldDelete = true;
+ }
+ }
+ } catch (IOException e) {
+ LOG.warn("Error during purging " + src + " " +
+ StringUtils.stringifyException(e));
+ }
+ }
+
+ if (shouldDelete) {
boolean done = destFs.delete(destPath, false);
if (done) {
LOG.info("Purged file " + destPath );
@@ -971,6 +1110,7 @@ public abstract class RaidNode implement
// total number of files in the har.
//
protected static int usefulHar(
+ ErasureCodeType code,
FileSystem srcFs, FileSystem destFs,
Path harPath, String destPrefix, Configuration conf) throws IOException {
@@ -996,6 +1136,15 @@ public abstract class RaidNode implement
continue;
}
String src = parityStr.substring(prefixToReplace.length());
+ if (code == ErasureCodeType.XOR) {
+ ParityFilePair ppair = getParityFile(
+ getDestinationPath(ErasureCodeType.RS, conf), new Path(src), conf);
+ if (ppair != null) {
+ // There is a valid RS parity file, so the XOR one is useless.
+ numUseless++;
+ continue;
+ }
+ }
try {
FileStatus srcStatus = srcFs.getFileStatus(new Path(src));
if (srcStatus == null) {
@@ -1026,18 +1175,16 @@ public abstract class RaidNode implement
LOG.info("Started archive scan");
prevExec = now();
+ // fetch all categories
for (PolicyList category : configMgr.getAllPolicies()) {
for (PolicyInfo info: category.getAll()) {
+ String tmpHarPath = tmpHarPathForCode(conf, info.getErasureCode());
String str = info.getProperty("time_before_har");
- String tmpHarPath = info.getProperty("har_tmp_dir");
- if (tmpHarPath == null) {
- tmpHarPath = "/tmp/raid_har";
- }
if (str != null) {
try {
long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
- Path destPref = getDestinationPath(conf);
+ Path destPref = getDestinationPath(info.getErasureCode(), conf);
FileSystem destFs = destPref.getFileSystem(conf);
//get srcPaths
@@ -1074,7 +1221,7 @@ public abstract class RaidNode implement
return;
}
- private void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
+ void recurseHar(PolicyInfo info, FileSystem destFs, FileStatus dest,
String destPrefix, FileSystem srcFs, long cutoff, String tmpHarPath)
throws IOException {
@@ -1198,10 +1345,71 @@ public abstract class RaidNode implement
}
/**
- * Return the path prefix that stores the parity files
+ * Return the temp path for XOR parity files
*/
- static Path getDestinationPath(Configuration conf)
- throws IOException {
+ public static String unraidTmpDirectory(Configuration conf) {
+ return conf.get(RAID_TMP_LOCATION_KEY, DEFAULT_RAID_TMP_LOCATION);
+ }
+
+ /**
+ * Return the temp path for ReedSolomonEncoder parity files
+ */
+ public static String rsTempPrefix(Configuration conf) {
+ return conf.get(RAIDRS_TMP_LOCATION_KEY, DEFAULT_RAIDRS_TMP_LOCATION);
+ }
+
+ /**
+ * Return the temp path for XOR parity files
+ */
+ public static String xorHarTempPrefix(Configuration conf) {
+ return conf.get(RAID_HAR_TMP_LOCATION_KEY, DEFAULT_RAID_HAR_TMP_LOCATION);
+ }
+
+ /**
+ * Return the temp path for ReedSolomonEncoder parity files
+ */
+ public static String rsHarTempPrefix(Configuration conf) {
+ return conf.get(RAIDRS_HAR_TMP_LOCATION_KEY,
+ DEFAULT_RAIDRS_HAR_TMP_LOCATION);
+ }
+
+ /**
+ * Return the destination path for ReedSolomon parity files
+ */
+ public static Path rsDestinationPath(Configuration conf, FileSystem fs) {
+ String loc = conf.get(RAIDRS_LOCATION_KEY, DEFAULT_RAIDRS_LOCATION);
+ Path p = new Path(loc.trim());
+ p = p.makeQualified(fs);
+ return p;
+ }
+
+ /**
+ * Return the destination path for ReedSolomon parity files
+ */
+ public static Path rsDestinationPath(Configuration conf)
+ throws IOException {
+ String loc = conf.get(RAIDRS_LOCATION_KEY, DEFAULT_RAIDRS_LOCATION);
+ Path p = new Path(loc.trim());
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ p = p.makeQualified(fs);
+ return p;
+ }
+
+ /**
+ * Return the destination path for XOR parity files
+ */
+ public static Path xorDestinationPath(Configuration conf, FileSystem fs) {
+ String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
+ Path p = new Path(loc.trim());
+ p = p.makeQualified(fs);
+ return p;
+ }
+
+ /**
+ * Return the destination path for XOR parity files
+ */
+ public static Path xorDestinationPath(Configuration conf)
+ throws IOException {
String loc = conf.get(RAID_LOCATION_KEY, DEFAULT_RAID_LOCATION);
Path p = new Path(loc.trim());
FileSystem fs = FileSystem.get(p.toUri(), conf);
@@ -1210,12 +1418,57 @@ public abstract class RaidNode implement
}
/**
+ * Return the path prefix that stores the parity files
+ */
+ static Path getDestinationPath(ErasureCodeType code, Configuration conf)
+ throws IOException {
+ switch (code) {
+ case XOR:
+ return xorDestinationPath(conf);
+ case RS:
+ return rsDestinationPath(conf);
+ default:
+ return null;
+ }
+ }
+
+ static Encoder encoderForCode(Configuration conf, ErasureCodeType code) {
+ int stripeLength = getStripeLength(conf);
+ switch (code) {
+ case XOR:
+ return new XOREncoder(conf, stripeLength);
+ case RS:
+ return new ReedSolomonEncoder(conf, stripeLength, rsParityLength(conf));
+ default:
+ return null;
+ }
+ }
+
+ static String tmpHarPathForCode(Configuration conf, ErasureCodeType code) {
+ switch (code) {
+ case XOR:
+ return xorHarTempPrefix(conf);
+ case RS:
+ return rsHarTempPrefix(conf);
+ default:
+ return null;
+ }
+ }
+
+ /**
* Obtain stripe length from configuration
*/
public static int getStripeLength(Configuration conf) {
return conf.getInt(STRIPE_LENGTH_KEY, DEFAULT_STRIPE_LENGTH);
}
+ /**
+ * Obtain stripe length from configuration
+ */
+ public static int rsParityLength(Configuration conf) {
+ return conf.getInt(RS_PARITY_LENGTH_KEY, RS_PARITY_LENGTH_DEFAULT);
+ }
+
static boolean isParityHarPartFile(Path p) {
Matcher m = PARITY_HAR_PARTFILE_PATTERN.matcher(p.toUri().getPath());
return m.matches();
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidUtils.java Fri Nov 12 00:17:22 2010
@@ -39,7 +39,7 @@ public class RaidUtils {
*
* We could have used Reporter.NULL here but that would introduce
* a dependency on mapreduce.
- */
+ */
public static class DummyProgressable implements Progressable {
/**
* Do nothing.
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonCode.java Fri Nov 12 00:17:22 2010
@@ -22,9 +22,9 @@ public class ReedSolomonCode implements
private final int stripeSize;
private final int paritySize;
- private final int[] generatingRoots;
private final int[] generatingPolynomial;
private final int PRIMITIVE_ROOT = 2;
+ private final int[] primitivePower;
private final GaloisField GF = new GaloisField();
private int[] errSignature;
private final int[] paritySymbolLocations;
@@ -40,14 +40,17 @@ public class ReedSolomonCode implements
for (int i = 0; i < paritySize; i++) {
paritySymbolLocations[i] = i;
}
- this.generatingRoots = new int[paritySize];
-
+
+ this.primitivePower = new int[stripeSize + paritySize];
+ // compute powers of the primitive root
+ for (int i = 0; i < stripeSize + paritySize; i++) {
+ primitivePower[i] = GF.power(PRIMITIVE_ROOT, i);
+ }
// compute generating polynomial
int[] gen = {1};
int[] poly = new int[2];
for (int i = 0; i < paritySize; i++) {
- generatingRoots[i] = GF.power(PRIMITIVE_ROOT, i);
- poly[0] = generatingRoots[i];
+ poly[0] = primitivePower[i];
poly[1] = 1;
gen = GF.multiply(gen, poly);
}
@@ -80,8 +83,8 @@ public class ReedSolomonCode implements
data[erasedLocation[i]] = 0;
}
for (int i = 0; i < erasedLocation.length; i++) {
- errSignature[i] = GF.power(PRIMITIVE_ROOT, erasedLocation[i]);
- erasedValue[i] = GF.substitute(data, generatingRoots[i]);
+ errSignature[i] = primitivePower[erasedLocation[i]];
+ erasedValue[i] = GF.substitute(data, primitivePower[i]);
}
GF.solveVandermondeSystem(errSignature, erasedValue, erasedLocation.length);
}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonDecoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,226 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.OutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ChecksumException;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.BlockMissingException;
+
+public class ReedSolomonDecoder extends Decoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.ReedSolomonDecoder");
+ private ErasureCode reedSolomonCode;
+
+ public ReedSolomonDecoder(
+ Configuration conf, int stripeSize, int paritySize) {
+ super(conf, stripeSize, paritySize);
+ this.reedSolomonCode = new ReedSolomonCode(stripeSize, paritySize);
+ }
+
+ @Override
+ protected void fixErasedBlock(
+ FileSystem fs, Path srcFile,
+ FileSystem parityFs, Path parityFile,
+ long blockSize, long errorOffset, long bytesToSkip, long limit,
+ OutputStream out) throws IOException {
+ FSDataInputStream[] inputs = new FSDataInputStream[stripeSize + paritySize];
+ int[] erasedLocations = buildInputs(fs, srcFile, parityFs, parityFile,
+ errorOffset, inputs);
+ int blockIdxInStripe = ((int)(errorOffset/blockSize)) % stripeSize;
+ int erasedLocationToFix = paritySize + blockIdxInStripe;
+ writeFixedBlock(inputs, erasedLocations, erasedLocationToFix,
+ bytesToSkip, limit, out);
+ }
+
+ protected int[] buildInputs(FileSystem fs, Path srcFile,
+ FileSystem parityFs, Path parityFile,
+ long errorOffset, FSDataInputStream[] inputs)
+ throws IOException {
+ LOG.info("Building inputs to recover block starting at " + errorOffset);
+ FileStatus srcStat = fs.getFileStatus(srcFile);
+ long blockSize = srcStat.getBlockSize();
+ long blockIdx = (int)(errorOffset / blockSize);
+ long stripeIdx = blockIdx / stripeSize;
+ LOG.info("FileSize = " + srcStat.getLen() + ", blockSize = " + blockSize +
+ ", blockIdx = " + blockIdx + ", stripeIdx = " + stripeIdx);
+ ArrayList<Integer> erasedLocations = new ArrayList<Integer>();
+ // First open streams to the parity blocks.
+ for (int i = 0; i < paritySize; i++) {
+ long offset = blockSize * (stripeIdx * paritySize + i);
+ FSDataInputStream in = parityFs.open(
+ parityFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+ in.seek(offset);
+ LOG.info("Adding " + parityFile + ":" + offset + " as input " + i);
+ inputs[i] = in;
+ }
+ // Now open streams to the data blocks.
+ for (int i = paritySize; i < paritySize + stripeSize; i++) {
+ long offset = blockSize * (stripeIdx * stripeSize + i - paritySize);
+ if (offset == errorOffset) {
+ LOG.info(srcFile + ":" + offset +
+ " is known to have error, adding zeros as input " + i);
+ inputs[i] = new FSDataInputStream(new RaidUtils.ZeroInputStream(
+ offset + blockSize));
+ erasedLocations.add(i);
+ } else if (offset > srcStat.getLen()) {
+ LOG.info(srcFile + ":" + offset +
+ " is past file size, adding zeros as input " + i);
+ inputs[i] = new FSDataInputStream(new RaidUtils.ZeroInputStream(
+ offset + blockSize));
+ } else {
+ FSDataInputStream in = fs.open(
+ srcFile, conf.getInt("io.file.buffer.size", 64 * 1024));
+ in.seek(offset);
+ LOG.info("Adding " + srcFile + ":" + offset + " as input " + i);
+ inputs[i] = in;
+ }
+ }
+ if (erasedLocations.size() > paritySize) {
+ String msg = "Too many erased locations: " + erasedLocations.size();
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+ int[] locs = new int[erasedLocations.size()];
+ for (int i = 0; i < locs.length; i++) {
+ locs[i] = erasedLocations.get(i);
+ }
+ return locs;
+ }
+
+ /**
+ * Decode the inputs provided and write to the output.
+ * @param inputs array of inputs.
+ * @param erasedLocations indexes in the inputs which are known to be erased.
+ * @param erasedLocationToFix index in the inputs which needs to be fixed.
+ * @param skipBytes number of bytes to skip before writing to output.
+ * @param limit maximum number of bytes to be written/skipped.
+ * @param out the output.
+ * @throws IOException
+ */
+ void writeFixedBlock(
+ FSDataInputStream[] inputs,
+ int[] erasedLocations,
+ int erasedLocationToFix,
+ long skipBytes,
+ long limit,
+ OutputStream out) throws IOException {
+
+ LOG.info("Need to write " + (limit - skipBytes) +
+ " bytes for erased location index " + erasedLocationToFix);
+ int[] tmp = new int[inputs.length];
+ int[] decoded = new int[erasedLocations.length];
+ long toDiscard = skipBytes;
+ // Loop while the number of skipped + written bytes is less than the max.
+ for (long written = 0; skipBytes + written < limit; ) {
+ erasedLocations = readFromInputs(inputs, erasedLocations, limit);
+ if (decoded.length != erasedLocations.length) {
+ decoded = new int[erasedLocations.length];
+ }
+
+ int toWrite = (int)Math.min((long)bufSize, limit - (skipBytes + written));
+ if (toDiscard >= toWrite) {
+ toDiscard -= toWrite;
+ continue;
+ }
+
+ // Decoded bufSize amount of data.
+ for (int i = 0; i < bufSize; i++) {
+ performDecode(readBufs, writeBufs, i, tmp, erasedLocations, decoded);
+ }
+
+ for (int i = 0; i < erasedLocations.length; i++) {
+ if (erasedLocations[i] == erasedLocationToFix) {
+ toWrite -= toDiscard;
+ out.write(writeBufs[i], (int)toDiscard, toWrite);
+ toDiscard = 0;
+ written += toWrite;
+ LOG.debug("Wrote " + toWrite + " bytes for erased location index " +
+ erasedLocationToFix);
+ break;
+ }
+ }
+ }
+ }
+
+ int[] readFromInputs(
+ FSDataInputStream[] inputs,
+ int[] erasedLocations,
+ long limit) throws IOException {
+ // For every input, read some data = bufSize.
+ for (int i = 0; i < inputs.length; i++) {
+ long curPos = inputs[i].getPos();
+ try {
+ RaidUtils.readTillEnd(inputs[i], readBufs[i], true);
+ continue;
+ } catch (BlockMissingException e) {
+ LOG.error("Encountered BlockMissingException in stream " + i);
+ } catch (ChecksumException e) {
+ LOG.error("Encountered ChecksumException in stream " + i);
+ }
+
+ // Found a new erased location.
+ if (erasedLocations.length == paritySize) {
+ String msg = "Too many read errors";
+ LOG.error(msg);
+ throw new IOException(msg);
+ }
+
+ // Add this stream to the set of erased locations.
+ int[] newErasedLocations = new int[erasedLocations.length + 1];
+ for (int j = 0; j < erasedLocations.length; j++) {
+ newErasedLocations[j] = erasedLocations[j];
+ }
+ newErasedLocations[newErasedLocations.length - 1] = i;
+ erasedLocations = newErasedLocations;
+
+ LOG.info("Using zeros for stream " + i);
+ inputs[i] = new FSDataInputStream(
+ new RaidUtils.ZeroInputStream(curPos + limit));
+ inputs[i].seek(curPos);
+ RaidUtils.readTillEnd(inputs[i], readBufs[i], true);
+ }
+ return erasedLocations;
+ }
+
+ void performDecode(byte[][] readBufs, byte[][] writeBufs,
+ int idx, int[] inputs,
+ int[] erasedLocations, int[] decoded) {
+ for (int i = 0; i < decoded.length; i++) {
+ decoded[i] = 0;
+ }
+ for (int i = 0; i < inputs.length; i++) {
+ inputs[i] = readBufs[i][idx] & 0x000000FF;
+ }
+ reedSolomonCode.decode(inputs, erasedLocations, decoded);
+ for (int i = 0; i < decoded.length; i++) {
+ writeBufs[i][idx] = (byte)decoded[i];
+ }
+ }
+
+}
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java?rev=1034221&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/ReedSolomonEncoder.java Fri Nov 12 00:17:22 2010
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.util.Progressable;
+
+public class ReedSolomonEncoder extends Encoder {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.ReedSolomonEncoder");
+ private ErasureCode reedSolomonCode;
+
+ public ReedSolomonEncoder(
+ Configuration conf, int stripeSize, int paritySize) {
+ super(conf, stripeSize, paritySize);
+ this.reedSolomonCode = new ReedSolomonCode(stripeSize, paritySize);
+ }
+
+ protected void encodeStripe(
+ InputStream[] blocks,
+ long stripeStartOffset,
+ long blockSize,
+ OutputStream[] outs,
+ Progressable reporter) throws IOException {
+
+ int[] data = new int[stripeSize];
+ int[] code = new int[paritySize];
+
+ for (long encoded = 0; encoded < blockSize; encoded += bufSize) {
+ // Read some data from each block = bufSize.
+ for (int i = 0; i < blocks.length; i++) {
+ RaidUtils.readTillEnd(blocks[i], readBufs[i], true);
+ }
+
+ // Encode the data read.
+ for (int j = 0; j < bufSize; j++) {
+ performEncode(readBufs, writeBufs, j, data, code);
+ }
+
+ // Now that we have some data to write, send it to the temp files.
+ for (int i = 0; i < paritySize; i++) {
+ outs[i].write(writeBufs[i], 0, bufSize);
+ }
+
+ if (reporter != null) {
+ reporter.progress();
+ }
+ }
+ }
+
+ void performEncode(byte[][] readBufs, byte[][] writeBufs, int idx,
+ int[] data, int[] code) {
+ for (int i = 0; i < paritySize; i++) {
+ code[i] = 0;
+ }
+ for (int i = 0; i < stripeSize; i++) {
+ data[i] = readBufs[i][idx] & 0x000000FF;
+ }
+ reedSolomonCode.encode(data, code);
+ for (int i = 0; i < paritySize; i++) {
+ writeBufs[i][idx] = (byte)code[i];
+ }
+ }
+
+ @Override
+ public Path getParityTempPath() {
+ return new Path(RaidNode.rsTempPrefix(conf));
+ }
+
+}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/XOREncoder.java Fri Nov 12 00:17:22 2010
@@ -55,4 +55,9 @@ public class XOREncoder extends Encoder
parityIn.close();
}
}
+
+ @Override
+ public Path getParityTempPath() {
+ return new Path(RaidNode.unraidTmpDirectory(conf));
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java Fri Nov 12 00:17:22 2010
@@ -49,14 +49,26 @@ public class PolicyInfo implements Writa
private Path srcPath; // the specified src path
private String policyName; // name of policy
- private String destinationPath; // A destination path for this policy
+ private ErasureCodeType codeType;// the erasure code used
private String description; // A verbose description of this policy
private Configuration conf; // Hadoop configuration
private Properties properties; // Policy-dependent properties
private ReentrantReadWriteLock plock; // protects policy operations.
-
+ public static enum ErasureCodeType {
+ XOR, RS;
+ public static ErasureCodeType fromString(String s) {
+ if (XOR.toString().equalsIgnoreCase(s)) {
+ return XOR;
+ }
+ if (RS.toString().equalsIgnoreCase(s)) {
+ return RS;
+ }
+ return null;
+ }
+ }
+
/**
* Create the empty object
*/
@@ -82,6 +94,31 @@ public class PolicyInfo implements Writa
}
/**
+ * Copy fields from another PolicyInfo
+ */
+ public void copyFrom(PolicyInfo other) {
+ if (other.conf != null) {
+ this.conf = other.conf;
+ }
+ if (other.policyName != null && other.policyName.length() > 0) {
+ this.policyName = other.policyName;
+ }
+ if (other.description != null && other.description.length() > 0) {
+ this.description = other.description;
+ }
+ if (other.codeType != null) {
+ this.codeType = other.codeType;
+ }
+ if (other.srcPath != null) {
+ this.srcPath = other.srcPath;
+ }
+ for (Object key : other.properties.keySet()) {
+ String skey = (String) key;
+ this.properties.setProperty(skey, other.properties.getProperty(skey));
+ }
+ }
+
+ /**
* Sets the input path on which this policy has to be applied
*/
public void setSrcPath(String in) throws IOException {
@@ -90,10 +127,10 @@ public class PolicyInfo implements Writa
}
/**
- * Set the destination path of this policy.
+ * Set the erasure code type used in this policy
*/
- public void setDestinationPath(String des) {
- this.destinationPath = des;
+ public void setErasureCode(String code) {
+ this.codeType = ErasureCodeType.fromString(code);
}
/**
@@ -130,8 +167,8 @@ public class PolicyInfo implements Writa
/**
* Get the destination path of this policy.
*/
- public String getDestinationPath() {
- return this.destinationPath;
+ public ErasureCodeType getErasureCode() {
+ return this.codeType;
}
/**
@@ -167,7 +204,7 @@ public class PolicyInfo implements Writa
StringBuffer buff = new StringBuffer();
buff.append("Policy Name:\t" + policyName + " --------------------\n");
buff.append("Source Path:\t" + srcPath + "\n");
- buff.append("Dest Path:\t" + destinationPath + "\n");
+ buff.append("Erasure Code:\t" + codeType + "\n");
for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
String name = (String) e.nextElement();
buff.append( name + ":\t" + properties.getProperty(name) + "\n");
@@ -195,7 +232,7 @@ public class PolicyInfo implements Writa
public void write(DataOutput out) throws IOException {
Text.writeString(out, srcPath.toString());
Text.writeString(out, policyName);
- Text.writeString(out, destinationPath);
+ Text.writeString(out, codeType.toString());
Text.writeString(out, description);
out.writeInt(properties.size());
for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
@@ -208,7 +245,7 @@ public class PolicyInfo implements Writa
public void readFields(DataInput in) throws IOException {
this.srcPath = new Path(Text.readString(in));
this.policyName = Text.readString(in);
- this.destinationPath = Text.readString(in);
+ this.codeType = ErasureCodeType.fromString(Text.readString(in));
this.description = Text.readString(in);
for (int n = in.readInt(); n>0; n--) {
String name = Text.readString(in);
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=1034221&r1=1034220&r2=1034221&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Fri Nov 12 00:17:22 2010
@@ -73,7 +73,8 @@ public class TestRaidDfs extends TestCas
RaidNode cnode = null;
String jobTrackerName = null;
- private void mySetup() throws Exception {
+ private void mySetup(String erasureCode, int stripeLength,
+ int rsParityLength) throws Exception {
new File(TEST_DIR).mkdirs(); // Make sure data directory exists
conf = new Configuration();
@@ -81,6 +82,7 @@ public class TestRaidDfs extends TestCas
conf.set("raid.config.file", CONFIG_FILE);
conf.setBoolean("raid.config.reload", true);
conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+ conf.setInt(RaidNode.RS_PARITY_LENGTH_KEY, rsParityLength);
// scan all policies once every 5 second
conf.setLong("raid.policy.rescan.interval", 5000);
@@ -92,8 +94,9 @@ public class TestRaidDfs extends TestCas
conf.set("raid.classname", "org.apache.hadoop.raid.LocalRaidNode");
conf.set("raid.server.address", "localhost:0");
- conf.setInt("hdfs.raid.stripeLength", 3);
- conf.set("hdfs.raid.locations", "/destraid");
+ conf.setInt("hdfs.raid.stripeLength", stripeLength);
+ conf.set("xor".equals(erasureCode) ? RaidNode.RAID_LOCATION_KEY :
+ RaidNode.RAIDRS_LOCATION_KEY, "/destraid");
dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
dfs.waitActive();
@@ -108,7 +111,7 @@ public class TestRaidDfs extends TestCas
String str = "<configuration> " +
"<srcPath prefix=\"/user/dhruba/raidtest\"> " +
"<policy name = \"RaidTest1\"> " +
- "<destPath> /destraid</destPath> " +
+ "<erasureCode>" + erasureCode + "</erasureCode> " +
"<property> " +
"<name>targetReplication</name> " +
"<value>1</value> " +
@@ -231,47 +234,28 @@ public class TestRaidDfs extends TestCas
}
/**
- * Create a file, corrupt a block in it and ensure that the file can be
- * read through DistributedRaidFileSystem.
+ * Create a file, corrupt several blocks in it and ensure that the file can be
+ * read through DistributedRaidFileSystem by ReedSolomon coding.
*/
- public void testRaidDfs() throws Exception {
+ public void testRaidDfsRs() throws Exception {
LOG.info("Test testRaidDfs started.");
long blockSize = 8192L;
int numBlocks = 8;
- int repl = 1;
- mySetup();
+ int stripeLength = 3;
+ mySetup("rs", stripeLength, 3);
// Create an instance of the RaidNode
Configuration localConf = new Configuration(conf);
localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
cnode = RaidNode.createRaidNode(null, localConf);
-
- Path file = new Path("/user/dhruba/raidtest/file");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
- int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
+ int[][] corrupt = {{1, 2, 3}, {1, 4, 7}, {3, 6, 7}};
try {
- long crc = createTestFilePartialLastBlock(fileSys, file, repl,
- numBlocks, blockSize);
- long length = fileSys.getFileStatus(file).getLen();
- waitForFileRaided(LOG, fileSys, file, destPath);
- LocatedBlocks locations = getBlockLocations(file);
-
for (int i = 0; i < corrupt.length; i++) {
- int blockNumToCorrupt = corrupt[i][0];
- LOG.info("Corrupt block " + blockNumToCorrupt + " of file");
- corruptBlock(file, locations.get(blockNumToCorrupt).getBlock(),
- NUM_DATANODES, false);
- validateFile(getRaidFS(), file, length, crc);
- }
-
- // Corrupt one more block. This is expected to fail.
- LOG.info("Corrupt one more block of file");
- corruptBlock(file, locations.get(1).getBlock(), NUM_DATANODES, false);
- try {
- validateFile(getRaidFS(), file, length, crc);
- fail("Expected exception ChecksumException not thrown!");
- } catch (org.apache.hadoop.fs.ChecksumException e) {
+ Path file = new Path("/user/dhruba/raidtest/file" + i);
+ corruptBlockAndValidate(
+ file, destPath, corrupt[0], blockSize, numBlocks);
}
} catch (Exception e) {
LOG.info("testRaidDfs Exception " + e +
@@ -288,7 +272,7 @@ public class TestRaidDfs extends TestCas
* Test DistributedRaidFileSystem.readFully()
*/
public void testReadFully() throws Exception {
- mySetup();
+ mySetup("xor", 3, 1);
try {
Path file = new Path("/user/raid/raidtest/file1");
@@ -328,7 +312,7 @@ public class TestRaidDfs extends TestCas
long blockSize = 8192L;
int numBlocks = 8;
int repl = 1;
- mySetup();
+ mySetup("xor", 3, 1);
Path file = new Path("/user/dhruba/raidtest/file");
Path destPath = new Path("/destraid/user/dhruba/raidtest");
@@ -351,6 +335,41 @@ public class TestRaidDfs extends TestCas
myTearDown();
}
}
+ /**
+ * Create a file, corrupt a block in it and ensure that the file can be
+ * read through DistributedRaidFileSystem by XOR code.
+ */
+ public void testRaidDfsXor() throws Exception {
+ LOG.info("Test testRaidDfs started.");
+
+ long blockSize = 8192L;
+ int numBlocks = 8;
+ int stripeLength = 3;
+ mySetup("xor", stripeLength, 1);
+
+ // Create an instance of the RaidNode
+ Configuration localConf = new Configuration(conf);
+ localConf.set(RaidNode.RAID_LOCATION_KEY, "/destraid");
+ cnode = RaidNode.createRaidNode(null, localConf);
+
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ int[][] corrupt = {{0}, {4}, {7}}; // first, last and middle block
+ try {
+ for (int i = 0; i < corrupt.length; i++) {
+ Path file = new Path("/user/dhruba/raidtest/" + i);
+ corruptBlockAndValidate(
+ file, destPath, corrupt[0], blockSize, numBlocks);
+ }
+ } catch (Exception e) {
+ LOG.info("testRaidDfs Exception " + e +
+ StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
+ myTearDown();
+ }
+ LOG.info("Test testRaidDfs completed.");
+ }
//
// creates a file and populate it with random data. Returns its crc.