You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by dh...@apache.org on 2009/10/06 09:31:12 UTC
svn commit: r822153 [2/3] - in /hadoop/hdfs/trunk: ./ src/contrib/
src/contrib/raid/ src/contrib/raid/conf/ src/contrib/raid/ivy/
src/contrib/raid/lib/ src/contrib/raid/src/ src/contrib/raid/src/java/
src/contrib/raid/src/java/org/ src/contrib/raid/src...
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,1250 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Iterator;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.Set;
+import java.util.HashSet;
+import java.lang.Thread;
+import java.net.InetSocketAddress;
+import org.xml.sax.SAXException;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.hadoop.ipc.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.mapred.Reporter;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.raid.protocol.RaidProtocol;
+import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
+
+/**
+ * A {@link RaidNode} that implements
+ */
+public class RaidNode implements RaidProtocol {
+ public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.raid.RaidNode");
+ public static final long SLEEP_TIME = 10000L; // 10 seconds
+ public static final int DEFAULT_PORT = 60000;
+ public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
+ public static final String DEFAULT_RAID_LOCATION = "/raid";
+
+ /** RPC server */
+ private Server server;
+ /** RPC server address */
+ private InetSocketAddress serverAddress = null;
+ /** only used for testing purposes */
+ private boolean stopRequested = false;
+
+ /** Configuration Manager */
+ private ConfigManager configMgr;
+
+ /** hadoop configuration */
+ private Configuration conf;
+
+ protected boolean initialized; // Are we initialized?
+ protected volatile boolean running; // Are we running?
+
+ /** Deamon thread to trigger policies */
+ Daemon triggerThread = null;
+
+ /** Deamon thread to delete obsolete parity files */
+ Daemon purgeThread = null;
+
+ /** Do do distributed raiding */
+ boolean isRaidLocal = false;
+
+ // statistics about RAW hdfs blocks. This counts all replicas of a block.
+ public static class Statistics {
+ long numProcessedBlocks; // total blocks encountered in namespace
+ long processedSize; // disk space occupied by all blocks
+ long remainingSize; // total disk space post RAID
+
+ long numMetaBlocks; // total blocks in metafile
+ long metaSize; // total disk space for meta files
+
+ public void clear() {
+ numProcessedBlocks = 0;
+ processedSize = 0;
+ remainingSize = 0;
+ numMetaBlocks = 0;
+ metaSize = 0;
+ }
+ public String toString() {
+ long save = processedSize - (remainingSize + metaSize);
+ long savep = 0;
+ if (processedSize > 0) {
+ savep = (save * 100)/processedSize;
+ }
+ String msg = " numProcessedBlocks = " + numProcessedBlocks +
+ " processedSize = " + processedSize +
+ " postRaidSize = " + remainingSize +
+ " numMetaBlocks = " + numMetaBlocks +
+ " metaSize = " + metaSize +
+ " %save in raw disk space = " + savep;
+ return msg;
+ }
+ }
+
+ // Startup options
+ static public enum StartupOption{
+ TEST ("-test"),
+ REGULAR ("-regular");
+
+ private String name = null;
+ private StartupOption(String arg) {this.name = arg;}
+ public String getName() {return name;}
+ }
+
+ /**
+ * Start RaidNode.
+ * <p>
+ * The raid-node can be started with one of the following startup options:
+ * <ul>
+ * <li>{@link StartupOption#REGULAR REGULAR} - normal raid node startup</li>
+ * </ul>
+ * The option is passed via configuration field:
+ * <tt>fs.raidnode.startup</tt>
+ *
+ * The conf will be modified to reflect the actual ports on which
+ * the RaidNode is up and running if the user passes the port as
+ * <code>zero</code> in the conf.
+ *
+ * @param conf confirguration
+ * @throws IOException
+ */
+
+ RaidNode(Configuration conf) throws IOException {
+ try {
+ initialize(conf);
+ } catch (IOException e) {
+ this.stop();
+ throw e;
+ } catch (Exception e) {
+ this.stop();
+ throw new IOException(e);
+ }
+ }
+
+ public long getProtocolVersion(String protocol,
+ long clientVersion) throws IOException {
+ if (protocol.equals(RaidProtocol.class.getName())) {
+ return RaidProtocol.versionID;
+ } else {
+ throw new IOException("Unknown protocol to name node: " + protocol);
+ }
+ }
+
+ /**
+ * Wait for service to finish.
+ * (Normally, it runs forever.)
+ */
+ public void join() {
+ try {
+ if (server != null) server.join();
+ if (triggerThread != null) triggerThread.join();
+ if (purgeThread != null) purgeThread.join();
+ } catch (InterruptedException ie) {
+ // do nothing
+ }
+ }
+
+ /**
+ * Stop all RaidNode threads and wait for all to finish.
+ */
+ public void stop() {
+ if (stopRequested) {
+ return;
+ }
+ stopRequested = true;
+ running = false;
+ if (server != null) server.stop();
+ if (triggerThread != null) triggerThread.interrupt();
+ if (purgeThread != null) purgeThread.interrupt();
+ }
+
+ private static InetSocketAddress getAddress(String address) {
+ return NetUtils.createSocketAddr(address);
+ }
+
+ public static InetSocketAddress getAddress(Configuration conf) {
+ String nodeport = conf.get("raid.server.address");
+ if (nodeport == null) {
+ nodeport = "localhost:" + DEFAULT_PORT;
+ }
+ return getAddress(nodeport);
+ }
+
+
+ private void initialize(Configuration conf)
+ throws IOException, SAXException, InterruptedException, RaidConfigurationException,
+ ClassNotFoundException, ParserConfigurationException {
+ this.conf = conf;
+ InetSocketAddress socAddr = RaidNode.getAddress(conf);
+ int handlerCount = conf.getInt("fs.raidnode.handler.count", 10);
+
+ isRaidLocal = conf.getBoolean("fs.raidnode.local", false);
+ // read in the configuration
+ configMgr = new ConfigManager(conf);
+
+ // create rpc server
+ this.server = RPC.getServer(this, socAddr.getHostName(), socAddr.getPort(),
+ handlerCount, false, conf);
+
+ // The rpc-server port can be ephemeral... ensure we have the correct info
+ this.serverAddress = this.server.getListenerAddress();
+ LOG.info("RaidNode up at: " + this.serverAddress);
+
+ initialized = true;
+ running = true;
+ this.server.start(); // start RPC server
+
+ // start the deamon thread to fire polcies appropriately
+ this.triggerThread = new Daemon(new TriggerMonitor());
+ this.triggerThread.start();
+
+ // start the thread that deletes obsolete parity files
+ this.purgeThread = new Daemon(new PurgeMonitor());
+ this.purgeThread.start();
+ }
+
+ /**
+ * Implement RaidProtocol methods
+ */
+
+ /** {@inheritDoc} */
+ public PolicyList[] getAllPolicies() throws IOException {
+ Collection<PolicyList> list = configMgr.getAllPolicies();
+ return list.toArray(new PolicyList[list.size()]);
+ }
+
+ /** {@inheritDoc} */
+ public ReturnStatus recoverFile(String inStr, long corruptOffset) throws IOException {
+
+ LOG.info("Recover File for " + inStr + " for corrupt offset " + corruptOffset);
+ Path inputPath = new Path(inStr);
+ Path srcPath = inputPath.makeQualified(inputPath.getFileSystem(conf));
+ PolicyInfo info = findMatchingPolicy(srcPath);
+ if (info != null) {
+
+ // find stripe length from config
+ int stripeLength = getStripeLength(conf, info);
+
+ // create destination path prefix
+ String destPrefix = getDestinationPath(conf, info);
+ Path destPath = new Path(destPrefix.trim());
+ FileSystem fs = FileSystem.get(destPath.toUri(), conf);
+ destPath = destPath.makeQualified(fs);
+
+ unRaid(conf, srcPath, destPath, stripeLength, corruptOffset);
+ }
+ return ReturnStatus.SUCCESS;
+ }
+
+ /**
+ * Periodically checks to see which policies should be fired.
+ */
+ class TriggerMonitor implements Runnable {
+ /**
+ */
+ public void run() {
+ while (running) {
+ try {
+ doProcess();
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ } finally {
+ LOG.info("Trigger thread continuing to run...");
+ }
+ }
+ }
+
+
+ /**
+ * Keep processing policies.
+ * If the config file has changed, then reload config file and start afresh.
+ */
+ private void doProcess() throws IOException, InterruptedException {
+ PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+
+ long prevExec = 0;
+ DistRaid dr = null;
+ while (running) {
+
+ boolean reload = configMgr.reloadConfigsIfNecessary();
+ while(!reload && now() < prevExec + configMgr.getPeriodicity()){
+ Thread.sleep(SLEEP_TIME);
+ reload = configMgr.reloadConfigsIfNecessary();
+ }
+
+ prevExec = now();
+
+ // activate all categories
+ Collection<PolicyList> all = configMgr.getAllPolicies();
+
+ // sort all policies by reverse lexicographical order. This is needed
+ // to make the nearest policy take precedence.
+ PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+ Arrays.sort(sorted, lexi);
+
+ if (!isRaidLocal) {
+ dr = new DistRaid(conf);
+ }
+ // paths we have processed so far
+ List<String> processed = new LinkedList<String>();
+
+ for (PolicyList category : sorted) {
+ for (PolicyInfo info: category.getAll()) {
+
+ long modTimePeriod = 0;
+ short srcReplication = 0;
+ String str = info.getProperty("modTimePeriod");
+ if (str != null) {
+ modTimePeriod = Long.parseLong(info.getProperty("modTimePeriod"));
+ }
+ str = info.getProperty("srcReplication");
+ if (str != null) {
+ srcReplication = Short.parseShort(info.getProperty("srcReplication"));
+ }
+
+ LOG.info("Triggering Policy Filter " + info.getName() +
+ " " + info.getSrcPath());
+ List<FileStatus> filteredPaths = null;
+ try {
+ filteredPaths = selectFiles(conf, info.getSrcPath(),
+ getDestinationPath(conf, info),
+ modTimePeriod,
+ srcReplication,
+ prevExec);
+ } catch (Exception e) {
+ LOG.info("Exception while invoking filter on policy " + info.getName() +
+ " srcPath " + info.getSrcPath() +
+ " exception " + StringUtils.stringifyException(e));
+ continue;
+ }
+
+ if (filteredPaths == null || filteredPaths.size() == 0) {
+ LOG.info("No filtered paths for policy " + info.getName());
+ continue;
+ }
+
+ // If any of the filtered path has already been accepted
+ // by a previous policy, then skip it.
+ for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
+ String fs = iter.next().getPath().toString() + "/";
+ for (String p : processed) {
+ if (p.startsWith(fs)) {
+ iter.remove();
+ break;
+ }
+ }
+ }
+
+ // Apply the action on accepted paths
+ LOG.info("Triggering Policy Action " + info.getName());
+ try {
+ if (isRaidLocal){
+ doRaid(conf, info, filteredPaths);
+ }
+ else{
+ //add paths for distributed raiding
+ dr.addRaidPaths(info, filteredPaths);
+ }
+ } catch (Exception e) {
+ LOG.info("Exception while invoking action on policy " + info.getName() +
+ " srcPath " + info.getSrcPath() +
+ " exception " + StringUtils.stringifyException(e));
+ continue;
+ }
+
+ // add these paths to processed paths
+ for (Iterator<FileStatus> iter = filteredPaths.iterator(); iter.hasNext();) {
+ String p = iter.next().getPath().toString() + "/";
+ processed.add(p);
+ }
+ }
+ }
+ processed.clear(); // free up memory references before yielding
+
+ //do the distributed raiding
+ if (!isRaidLocal) {
+ dr.doDistRaid();
+ }
+ }
+ }
+ }
+
+ /**
+ * Returns the policy that matches the specified path.
+ * The method below finds the first policy that matches an input path. Since different
+ * policies with different purposes and destinations might be associated with the same input
+ * path, we should be skeptical about the places using the method and we should try to change
+ * the code to avoid it.
+ */
+ private PolicyInfo findMatchingPolicy(Path inpath) throws IOException {
+ PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+ Collection<PolicyList> all = configMgr.getAllPolicies();
+
+ // sort all policies by reverse lexicographical order. This is needed
+ // to make the nearest policy take precedence.
+ PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+ Arrays.sort(sorted, lexi);
+
+ // loop through all categories of policies.
+ for (PolicyList category : sorted) {
+ PolicyInfo first = category.getAll().iterator().next();
+ if (first != null) {
+ Path[] srcPaths = first.getSrcPathExpanded(); // input src paths unglobbed
+ if (srcPaths == null) {
+ continue;
+ }
+
+ for (Path src: srcPaths) {
+ if (inpath.toString().startsWith(src.toString())) {
+ // if the srcpath is a prefix of the specified path
+ // we have a match!
+ return first;
+ }
+ }
+ }
+ }
+ return null; // no matching policies
+ }
+
+ /**
+ * Returns a list of pathnames that needs raiding.
+ */
+ private List<FileStatus> selectFiles(Configuration conf, Path p, String destPrefix,
+ long modTimePeriod, short srcReplication, long now) throws IOException {
+
+ List<FileStatus> returnSet = new LinkedList<FileStatus>();
+
+ // expand destination prefix path
+ Path destp = new Path(destPrefix.trim());
+ FileSystem fs = FileSystem.get(destp.toUri(), conf);
+ destp = destp.makeQualified(fs);
+
+ fs = p.getFileSystem(conf);
+ FileStatus[] gpaths = fs.globStatus(p);
+ if (gpaths != null){
+ for (FileStatus onepath: gpaths) {
+ recurse(fs, conf, destp, onepath, returnSet, modTimePeriod, srcReplication, now);
+ }
+ }
+ return returnSet;
+ }
+
+ /**
+ * Pick files that need to be RAIDed.
+ */
+ private void recurse(FileSystem srcFs,
+ Configuration conf,
+ Path destPathPrefix,
+ FileStatus src,
+ List<FileStatus> accept,
+ long modTimePeriod,
+ short srcReplication,
+ long now) throws IOException {
+ Path path = src.getPath();
+ FileStatus[] files = null;
+ try {
+ files = srcFs.listStatus(path);
+ } catch (java.io.FileNotFoundException e) {
+ // ignore error because the file could have been deleted by an user
+ LOG.info("FileNotFound " + path + " " + StringUtils.stringifyException(e));
+ } catch (IOException e) {
+ throw e;
+ }
+
+ // If the modTime of the raid file is later than the modtime of the
+ // src file and the src file has not been modified
+ // recently, then that file is a candidate for RAID.
+
+ if (!src.isDir()) { // a file
+
+ // if the source file has fewer than or equal to 2 blocks, then no need to RAID
+ long blockSize = src.getBlockSize();
+ if (2 * blockSize >= src.getLen()) {
+ return;
+ }
+
+ // If the replication factor of the source file is already below what is
+ // expected by RAID, then ignore it.
+ if (src.getReplication() < srcReplication) {
+ return;
+ }
+
+ // check if destination path already exists. If it does and it's modification time
+ // does not match the modTime of the source file, then recalculate RAID
+ boolean add = false;
+ try {
+ Path outpath = new Path(destPathPrefix, makeRelative(path));
+ FileSystem outFs = outpath.getFileSystem(conf);
+ FileStatus ostat = outFs.getFileStatus(outpath);
+ if (ostat.getModificationTime() != src.getModificationTime() &&
+ src.getModificationTime() + modTimePeriod < now) {
+ add = true;
+ }
+ } catch (java.io.FileNotFoundException e) {
+ add = true; // destination file does not exist
+ }
+
+ if (add) {
+ accept.add(src);
+ }
+ return;
+
+ } else if (files != null) {
+ for (FileStatus one:files) {
+ recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
+ }
+ }
+ }
+
+
+ /**
+ * RAID a list of files.
+ */
+ void doRaid(Configuration conf, PolicyInfo info, List<FileStatus> paths)
+ throws IOException {
+ int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+ int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
+ int stripeLength = getStripeLength(conf, info);
+ String destPrefix = getDestinationPath(conf, info);
+ String simulate = info.getProperty("simulate");
+ boolean doSimulate = simulate == null ? false : Boolean
+ .parseBoolean(simulate);
+
+ Statistics statistics = new Statistics();
+ int count = 0;
+
+ Path p = new Path(destPrefix.trim());
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ p = p.makeQualified(fs);
+
+ for (FileStatus s : paths) {
+ doRaid(conf, s, p, statistics, null, doSimulate, targetRepl, metaRepl,
+ stripeLength);
+ if (count % 1000 == 0) {
+ LOG.info("RAID statistics " + statistics.toString());
+ }
+ count++;
+ }
+ LOG.info("RAID statistics " + statistics.toString());
+ }
+
+
+ /**
+ * RAID an individual file
+ */
+
+ static public void doRaid(Configuration conf, PolicyInfo info,
+ FileStatus src, Statistics statistics, Reporter reporter) throws IOException {
+ int targetRepl = Integer.parseInt(info.getProperty("targetReplication"));
+ int metaRepl = Integer.parseInt(info.getProperty("metaReplication"));
+ int stripeLength = getStripeLength(conf, info);
+ String destPrefix = getDestinationPath(conf, info);
+ String simulate = info.getProperty("simulate");
+ boolean doSimulate = simulate == null ? false : Boolean
+ .parseBoolean(simulate);
+
+ int count = 0;
+
+ Path p = new Path(destPrefix.trim());
+ FileSystem fs = FileSystem.get(p.toUri(), conf);
+ p = p.makeQualified(fs);
+
+ doRaid(conf, src, p, statistics, reporter, doSimulate, targetRepl, metaRepl,
+ stripeLength);
+ }
+
+
+ /**
+ * RAID an individual file
+ */
+ static private void doRaid(Configuration conf, FileStatus stat, Path destPath,
+ Statistics statistics, Reporter reporter, boolean doSimulate,
+ int targetRepl, int metaRepl, int stripeLength)
+ throws IOException {
+ Path p = stat.getPath();
+ FileSystem srcFs = p.getFileSystem(conf);
+
+ // extract block locations from File system
+ BlockLocation[] locations = srcFs.getFileBlockLocations(stat, 0, stat.getLen());
+
+ // if the file has fewer than 2 blocks, then nothing to do
+ if (locations.length <= 2) {
+ return;
+ }
+
+ // add up the raw disk space occupied by this file
+ long diskSpace = 0;
+ for (BlockLocation l: locations) {
+ diskSpace += (l.getLength() * stat.getReplication());
+ }
+ statistics.numProcessedBlocks += locations.length;
+ statistics.processedSize += diskSpace;
+
+ // generate parity file
+ generateParityFile(conf, stat, reporter, srcFs, destPath, locations, metaRepl, stripeLength);
+
+ // reduce the replication factor of the source file
+ if (!doSimulate) {
+ if (srcFs.setReplication(p, (short)targetRepl) == false) {
+ LOG.info("Error in reducing relication factor of file " + p + " to " + targetRepl);
+ statistics.remainingSize += diskSpace; // no change in disk space usage
+ return;
+ }
+ }
+
+ diskSpace = 0;
+ for (BlockLocation l: locations) {
+ diskSpace += (l.getLength() * targetRepl);
+ }
+ statistics.remainingSize += diskSpace;
+
+ // the metafile will have this many number of blocks
+ int numMeta = locations.length / stripeLength;
+ if (locations.length % stripeLength != 0) {
+ numMeta++;
+ }
+
+ // we create numMeta for every file. This metablock has metaRepl # replicas.
+ // the last block of the metafile might not be completely filled up, but we
+ // ignore that for now.
+ statistics.numMetaBlocks += (numMeta * metaRepl);
+ statistics.metaSize += (numMeta * metaRepl * stat.getBlockSize());
+ }
+
+ /**
+ * Create the parity file.
+ */
+ static private void generateParityFile(Configuration conf, FileStatus stat,
+ Reporter reporter,
+ FileSystem inFs,
+ Path destPathPrefix, BlockLocation[] locations,
+ int metaRepl, int stripeLength) throws IOException {
+
+ // two buffers for generating parity
+ int bufSize = 5 * 1024 * 1024; // 5 MB
+ byte[] bufs = new byte[bufSize];
+ byte[] xor = new byte[bufSize];
+ byte zero = 0;
+
+ Path inpath = stat.getPath();
+ long blockSize = stat.getBlockSize();
+ long fileSize = stat.getLen();
+
+ // create output tmp path
+ Path outpath = new Path(destPathPrefix, makeRelative(inpath));
+ Path tmppath = new Path(outpath + ".tmp");
+ FileSystem outFs = outpath.getFileSystem(conf);
+
+ // if the parity file is already upto-date, then nothing to do
+ try {
+ FileStatus stmp = outFs.getFileStatus(outpath);
+ if (stmp.getModificationTime() == stat.getModificationTime()) {
+ LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath +
+ " already upto-date. Nothing more to do.");
+ }
+ } catch (IOException e) {
+ // ignore errors because the raid file might not exist yet.
+ }
+
+ LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath);
+ FSDataOutputStream out = outFs.create(tmppath,
+ true,
+ conf.getInt("io.file.buffer.size", 64 * 1024),
+ (short)metaRepl,
+ blockSize);
+
+ try {
+
+ // loop once for every stripe length
+ for (int startBlock = 0; startBlock < locations.length;) {
+
+ // report progress to Map-reduce framework
+ if (reporter != null) {
+ reporter.progress();
+ }
+ int blocksLeft = locations.length - startBlock;
+ int stripe = Math.min(stripeLength, blocksLeft);
+ LOG.info(" startBlock " + startBlock + " stripe " + stripe);
+
+ // open a new file descriptor for each block in this stripe.
+ // make each fd point to the beginning of each block in this stripe.
+ FSDataInputStream[] ins = new FSDataInputStream[stripe];
+ for (int i = 0; i < stripe; i++) {
+ ins[i] = inFs.open(inpath, bufSize);
+ ins[i].seek(blockSize * (startBlock + i));
+ }
+
+ generateParity(ins,out,blockSize,bufs,xor);
+
+ // close input file handles
+ for (int i = 0; i < ins.length; i++) {
+ ins[i].close();
+ }
+
+ // increment startBlock to point to the first block to be processed
+ // in the next iteration
+ startBlock += stripe;
+ }
+ out.close();
+ out = null;
+
+ // rename tmppath to the real parity filename
+ if (!outFs.rename(tmppath, outpath)) {
+ String msg = "Unable to rename tmp file " + tmppath + " to " + outpath;
+ LOG.warn(msg);
+ throw new IOException (msg);
+ }
+ } finally {
+ // remove the tmp file if it still exists
+ outFs.delete(tmppath, false);
+ }
+
+ // set the modification time of the RAID file. This is done so that the modTime of the
+ // RAID file reflects that contents of the source file that it has RAIDed. This should
+ // also work for files that are being appended to. This is necessary because the time on
+ // on the destination namenode may not be synchronised with the timestamp of the
+ // source namenode.
+ outFs.setTimes(outpath, stat.getModificationTime(), -1);
+
+ FileStatus outstat = outFs.getFileStatus(outpath);
+ LOG.info("Source file " + inpath + " of size " + fileSize +
+ " Parity file " + outpath + " of size " + outstat.getLen() +
+ " src mtime " + stat.getModificationTime() +
+ " parity mtime " + outstat.getModificationTime());
+ }
+
+ private static int readInputUntilEnd(FSDataInputStream ins, byte[] bufs, int toRead)
+ throws IOException {
+
+ int tread = 0;
+
+ while (tread < toRead) {
+ int read = ins.read(bufs, tread, toRead - tread);
+ if (read == -1) {
+ return tread;
+ } else {
+ tread += read;
+ }
+ }
+
+ return tread;
+ }
+
+ private static void generateParity(FSDataInputStream[] ins, FSDataOutputStream fout,
+ long parityBlockSize, byte[] bufs, byte[] xor) throws IOException {
+
+ int bufSize;
+ if ((bufs == null) || (bufs.length == 0)){
+ bufSize = 5 * 1024 * 1024; // 5 MB
+ bufs = new byte[bufSize];
+ } else {
+ bufSize = bufs.length;
+ }
+ if ((xor == null) || (xor.length != bufs.length)){
+ xor = new byte[bufSize];
+ }
+
+ int xorlen = 0;
+
+ // this loop processes all good blocks in selected stripe
+ long remaining = parityBlockSize;
+
+ while (remaining > 0) {
+ int toRead = (int)Math.min(remaining, bufSize);
+
+ if (ins.length > 0) {
+ xorlen = readInputUntilEnd(ins[0], xor, toRead);
+ }
+
+ // read all remaining blocks and xor them into the buffer
+ for (int i = 1; i < ins.length; i++) {
+
+ int actualRead = readInputUntilEnd(ins[i], bufs, toRead);
+
+ int j;
+ int xorlimit = (int) Math.min(xorlen,actualRead);
+ for (j = 0; j < xorlimit; j++) {
+ xor[j] ^= bufs[j];
+ }
+ if ( actualRead > xorlen ){
+ for (; j < actualRead; j++) {
+ xor[j] = bufs[j];
+ }
+ xorlen = actualRead;
+ }
+
+ }
+
+ if (xorlen < toRead) {
+ Arrays.fill(bufs, xorlen, toRead, (byte) 0);
+ }
+
+ // write this to the tmp file
+ fout.write(xor, 0, toRead);
+ remaining -= toRead;
+ }
+
+ }
+
+ /**
+ * Extract a good block from the parity block. This assumes that the corruption
+ * is in the main file and the parity file is always good.
+ */
+ public static Path unRaid(Configuration conf, Path srcPath, Path destPathPrefix,
+ int stripeLength, long corruptOffset) throws IOException {
+
+ // extract block locations, size etc from source file
+ Random rand = new Random();
+ FileSystem srcFs = srcPath.getFileSystem(conf);
+ FileStatus srcStat = srcFs.getFileStatus(srcPath);
+ BlockLocation[] locations = srcFs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
+ long blockSize = srcStat.getBlockSize();
+ long fileSize = srcStat.getLen();
+ int totalBlocks = locations.length;
+
+ // find the stripe number where the corrupted offset lies
+ long snum = corruptOffset / (stripeLength * blockSize);
+ long startOffset = snum * stripeLength * blockSize;
+ long corruptBlockInStripe = (corruptOffset - startOffset)/blockSize;
+ long corruptBlockSize = Math.min(blockSize, fileSize - startOffset);
+
+ LOG.info("Start offset of relevent stripe = " + startOffset +
+ " corruptBlockInStripe " + corruptBlockInStripe);
+
+ // open file descriptors to read all good blocks of the file
+ FSDataInputStream[] instmp = new FSDataInputStream[stripeLength];
+ int numLength = 0;
+ for (int i = 0; i < stripeLength; i++) {
+ if (i == corruptBlockInStripe) {
+ continue; // do not open corrupt block
+ }
+ if (startOffset + i * blockSize >= fileSize) {
+ LOG.info("Stop offset of relevent stripe = " +
+ startOffset + i * blockSize);
+ break;
+ }
+ instmp[numLength] = srcFs.open(srcPath);
+ instmp[numLength].seek(startOffset + i * blockSize);
+ numLength++;
+ }
+
+ // create array of inputstream, allocate one extra slot for
+ // parity file. numLength could be smaller than stripeLength
+ // if we are processing the last partial stripe on a file.
+ numLength += 1;
+ FSDataInputStream[] ins = new FSDataInputStream[numLength];
+ for (int i = 0; i < numLength-1; i++) {
+ ins[i] = instmp[i];
+ }
+ LOG.info("Decompose a total of " + numLength + " blocks.");
+
+ // open and seek to the appropriate offset in parity file.
+ Path parityFile = new Path(destPathPrefix, makeRelative(srcPath));
+ FileSystem parityFs = parityFile.getFileSystem(conf);
+ LOG.info("Parity file for " + srcPath + " is " + parityFile);
+ ins[numLength-1] = parityFs.open(parityFile);
+ ins[numLength-1].seek(snum * blockSize);
+ LOG.info("Parity file " + parityFile +
+ " seeking to relevent block at offset " +
+ ins[numLength-1].getPos());
+
+ // create a temporary filename in the source filesystem
+ // do not overwrite an existing tmp file. Make it fail for now.
+ // We need to generate a unique name for this tmp file later on.
+ Path tmpFile = null;
+ FSDataOutputStream fout = null;
+ int retry = 5;
+ try {
+ tmpFile = new Path("/tmp/dhruba/" + rand.nextInt());
+ fout = parityFs.create(tmpFile, false);
+ } catch (IOException e) {
+ if (retry-- <= 0) {
+ LOG.info("Unable to create temporary file " + tmpFile +
+ " Aborting....");
+ throw e;
+ }
+ LOG.info("Unable to create temporary file " + tmpFile +
+ "Retrying....");
+ }
+ LOG.info("Created recovered block file " + tmpFile);
+
+ // buffers for generating parity bits
+ int bufSize = 5 * 1024 * 1024; // 5 MB
+ byte[] bufs = new byte[bufSize];
+ byte[] xor = new byte[bufSize];
+
+ generateParity(ins,fout,corruptBlockSize,bufs,xor);
+
+ // close all files
+ fout.close();
+ for (int i = 0; i < ins.length; i++) {
+ ins[i].close();
+ }
+
+ // Now, reopen the source file and the recovered block file
+ // and copy all relevant data to new file
+ Path recoveredPath = new Path(destPathPrefix, makeRelative(srcPath));
+ recoveredPath = new Path(recoveredPath + ".recovered");
+ LOG.info("Creating recovered file " + recoveredPath);
+
+ FSDataInputStream sin = srcFs.open(srcPath);
+ FSDataOutputStream out = parityFs.create(recoveredPath, false,
+ conf.getInt("io.file.buffer.size", 64 * 1024),
+ srcStat.getReplication(),
+ srcStat.getBlockSize());
+
+ FSDataInputStream bin = parityFs.open(tmpFile);
+ long recoveredSize = 0;
+
+ // copy all the good blocks (upto the corruption)
+ // from source file to output file
+ long remaining = corruptOffset / blockSize * blockSize;
+ while (remaining > 0) {
+ int toRead = (int)Math.min(remaining, bufSize);
+ sin.readFully(bufs, 0, toRead);
+ out.write(bufs, 0, toRead);
+ remaining -= toRead;
+ recoveredSize += toRead;
+ }
+ LOG.info("Copied upto " + recoveredSize + " from src file. ");
+
+ // copy recovered block to output file
+ remaining = corruptBlockSize;
+ while (recoveredSize < fileSize &&
+ remaining > 0) {
+ int toRead = (int)Math.min(remaining, bufSize);
+ bin.readFully(bufs, 0, toRead);
+ out.write(bufs, 0, toRead);
+ remaining -= toRead;
+ recoveredSize += toRead;
+ }
+ LOG.info("Copied upto " + recoveredSize + " from recovered-block file. ");
+
+ // skip bad block in src file
+ if (recoveredSize < fileSize) {
+ sin.seek(sin.getPos() + corruptBlockSize);
+ }
+
+ // copy remaining good data from src file to output file
+ while (recoveredSize < fileSize) {
+ int toRead = (int)Math.min(fileSize - recoveredSize, bufSize);
+ sin.readFully(bufs, 0, toRead);
+ out.write(bufs, 0, toRead);
+ recoveredSize += toRead;
+ }
+ out.close();
+ LOG.info("Completed writing " + recoveredSize + " bytes into " +
+ recoveredPath);
+
+ sin.close();
+ bin.close();
+
+ // delete the temporary block file that was created.
+ parityFs.delete(tmpFile, false);
+ LOG.info("Deleted temporary file " + tmpFile);
+
+ // copy the meta information from source path to the newly created
+ // recovered path
+ copyMetaInformation(parityFs, srcStat, recoveredPath);
+
+ return recoveredPath;
+ }
+
+ /**
+ * Periodically delete orphaned parity files.
+ */
+ class PurgeMonitor implements Runnable {
+ /**
+ */
+ public void run() {
+ while (running) {
+ try {
+ doPurge();
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ } finally {
+ LOG.info("Purge parity files thread continuing to run...");
+ }
+ }
+ }
+
+ /**
+ * Delete orphaned files. The reason this is done by a separate thread
+ * is to not burden the TriggerMonitor with scanning the
+ * destination directories.
+ */
+ private void doPurge() throws IOException, InterruptedException {
+ PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+
+ long prevExec = 0;
+ while (running) {
+
+ // The config may be reloaded by the TriggerMonitor.
+ // This thread uses whatever config is currently active.
+ while(now() < prevExec + configMgr.getPeriodicity()){
+ Thread.sleep(SLEEP_TIME);
+ }
+
+ prevExec = now();
+
+ // fetch all categories
+ Collection<PolicyList> all = configMgr.getAllPolicies();
+
+ // sort all policies by reverse lexicographical order. This is
+ // needed to make the nearest policy take precedence.
+ PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+ Arrays.sort(sorted, lexi);
+
+ // paths we have processed so far
+ Set<Path> processed = new HashSet<Path>();
+
+ for (PolicyList category : sorted) {
+ for (PolicyInfo info: category.getAll()) {
+
+ try {
+ // expand destination prefix path
+ String destinationPrefix = getDestinationPath(conf, info);
+ Path destp = new Path(destinationPrefix.trim());
+ FileSystem destFs = FileSystem.get(destp.toUri(), conf);
+ destp = destp.makeQualified(destFs);
+ destinationPrefix = destp.toUri().getPath(); // strip host:port
+ destp = new Path(destp, makeRelative(info.getSrcPath()));
+
+ // if this destination path has already been processed as part
+ // of another policy, then nothing more to do
+ if (processed.contains(destp)) {
+ LOG.info("Obsolete parity files for policy " +
+ info.getName() + " has already been procesed.");
+ continue;
+ }
+ LOG.info("Purging obsolete parity files for policy " +
+ info.getName() + " " + destp);
+
+ FileSystem srcFs = info.getSrcPath().getFileSystem(conf);
+ FileStatus stat = destFs.getFileStatus(destp);
+ if (stat != null) {
+ recursePurge(srcFs, destFs, destinationPrefix, stat);
+ }
+
+ // this destination path has already been processed
+ processed.add(destp);
+
+ } catch (Exception e) {
+ LOG.warn("Ignoring Exception while processing policy " +
+ info.getName() + " " +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * The destPrefix is the absolute pathname of the destinationPath
+ * specified in the policy (without the host:port)
+ */
+ private void recursePurge(FileSystem srcFs, FileSystem destFs,
+ String destPrefix, FileStatus dest)
+ throws IOException {
+
+ Path destPath = dest.getPath(); // pathname, no host:port
+ String destStr = destPath.toUri().getPath();
+ LOG.debug("Checking " + destPath + " prefix " + destPrefix);
+
+ // Verify the destPrefix is a prefix of the destPath
+ if (!destStr.startsWith(destPrefix)) {
+ LOG.error("Destination path " + destStr + " should have " +
+ destPrefix + " as its prefix.");
+ return;
+ }
+ String src = destStr.replaceFirst(destPrefix, "");
+
+ // if the source path does not exist, then delete the
+ // destination path as well
+ Path srcPath = new Path(src);
+ if (!srcFs.exists(srcPath)) {
+ boolean done = destFs.delete(destPath, false);
+ if (done) {
+ LOG.info("Purged path " + destPath );
+ } else {
+ LOG.info("Unable to purge path " + destPath );
+ }
+ return;
+ }
+ if (!dest.isDir()) {
+ return;
+ }
+ FileStatus[] files = null;
+ files = destFs.listStatus(destPath);
+ if (files != null) {
+ for (FileStatus one:files) {
+ recursePurge(srcFs, destFs, destPrefix, one);
+ }
+ }
+ }
+ }
+
+ /**
+ * If the config file has an entry for hdfs.raid.locations, then that overrides
+ * destination path specified in the raid policy file
+ */
+ static private String getDestinationPath(Configuration conf, PolicyInfo info) {
+ String locs = conf.get("hdfs.raid.locations");
+ if (locs != null) {
+ return locs;
+ }
+ locs = info.getDestinationPath();
+ if (locs == null) {
+ return DEFAULT_RAID_LOCATION;
+ }
+ return locs;
+ }
+
+ /**
+ * If the config file has an entry for hdfs.raid.stripeLength, then use that
+ * specified in the raid policy file
+ */
+ static private int getStripeLength(Configuration conf, PolicyInfo info)
+ throws IOException {
+ int len = conf.getInt("hdfs.raid.stripeLength", 0);
+ if (len != 0) {
+ return len;
+ }
+ String str = info.getProperty("stripeLength");
+ if (str == null) {
+ String msg = "hdfs.raid.stripeLength is not defined." +
+ " Using a default " + DEFAULT_STRIPE_LENGTH;
+ LOG.info(msg);
+ return DEFAULT_STRIPE_LENGTH;
+ }
+ return Integer.parseInt(str);
+ }
+
+ /**
+ * Copy the file owner, modtime, etc from srcPath to the recovered Path.
+ * It is possiible that we might have to retrieve file persmissions,
+ * quotas, etc too in future.
+ */
+ static private void copyMetaInformation(FileSystem fs, FileStatus stat,
+ Path recoveredPath)
+ throws IOException {
+ fs.setOwner(recoveredPath, stat.getOwner(), stat.getGroup());
+ fs.setPermission(recoveredPath, stat.getPermission());
+ fs.setTimes(recoveredPath, stat.getModificationTime(), stat.getAccessTime());
+ }
+
+ /**
+ * Returns current time.
+ */
+ static long now() {
+ return System.currentTimeMillis();
+ }
+
+ /**
+ * Make an absolute path relative by stripping the leading /
+ */
+ static private Path makeRelative(Path path) {
+ if (!path.isAbsolute()) {
+ return path;
+ }
+ String p = path.toUri().getPath();
+ String relative = p.substring(1, p.length());
+ return new Path(relative);
+ }
+
+ private static void printUsage() {
+ System.err.println("Usage: java RaidNode ");
+ }
+
+ private static StartupOption parseArguments(String args[]) {
+ int argsLen = (args == null) ? 0 : args.length;
+ StartupOption startOpt = StartupOption.REGULAR;
+ for(int i=0; i < argsLen; i++) {
+ String cmd = args[i]; // We have to parse command line args in future.
+ }
+ return startOpt;
+ }
+
+
+ /**
+ * Convert command line options to configuration parameters
+ */
+ private static void setStartupOption(Configuration conf, StartupOption opt) {
+ conf.set("fs.raidnode.startup", opt.toString());
+ }
+
+ /**
+ * Create an instance of the RaidNode
+ */
+ public static RaidNode createRaidNode(String argv[],
+ Configuration conf) throws IOException {
+ if (conf == null) {
+ conf = new Configuration();
+ }
+ StartupOption startOpt = parseArguments(argv);
+ if (startOpt == null) {
+ printUsage();
+ return null;
+ }
+ setStartupOption(conf, startOpt);
+ RaidNode node = new RaidNode(conf);
+ return node;
+ }
+
+
+ /**
+ */
+ public static void main(String argv[]) throws Exception {
+ try {
+ StringUtils.startupShutdownMessage(RaidNode.class, argv, LOG);
+ RaidNode raid = createRaidNode(argv, null);
+ if (raid != null) {
+ raid.join();
+ }
+ } catch (Throwable e) {
+ LOG.error(StringUtils.stringifyException(e));
+ System.exit(-1);
+ }
+ }
+
+
+
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidShell.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.concurrent.TimeUnit;
+import java.net.InetSocketAddress;
+import javax.security.auth.login.LoginException;
+
+import org.apache.hadoop.ipc.*;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryProxy;
+import org.apache.hadoop.security.UnixUserGroupInformation;
+import org.apache.hadoop.fs.Path;
+
+import org.apache.hadoop.raid.protocol.PolicyInfo;
+import org.apache.hadoop.raid.protocol.PolicyList;
+import org.apache.hadoop.raid.protocol.RaidProtocol;
+import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
+
+/**
+ * A {@link RaidShell} that allows browsing configured raid policies.
+ */
+public class RaidShell extends Configured implements Tool {
+ public static final Log LOG = LogFactory.getLog( "org.apache.hadoop.RaidShell");
+ public RaidProtocol raidnode;
+ final RaidProtocol rpcRaidnode;
+ private UnixUserGroupInformation ugi;
+ volatile boolean clientRunning = true;
+ private Configuration conf;
+
+ /**
+ * Start RaidShell.
+ * <p>
+ * The RaidShell connects to the specified RaidNode and performs basic
+ * configuration options.
+ * @throws IOException
+ */
+ public RaidShell() throws IOException {
+ this(new Configuration());
+ }
+
+ /**
+ * The RaidShell connects to the specified RaidNode and performs basic
+ * configuration options.
+ * @param conf The Hadoop configuration
+ * @throws IOException
+ */
+ public RaidShell(Configuration conf) throws IOException {
+ super(conf);
+ try {
+ this.ugi = UnixUserGroupInformation.login(conf, true);
+ } catch (LoginException e) {
+ throw (IOException)(new IOException().initCause(e));
+ }
+
+ this.rpcRaidnode = createRPCRaidnode(RaidNode.getAddress(conf), conf, ugi);
+ this.raidnode = createRaidnode(rpcRaidnode);
+ }
+
+ public static RaidProtocol createRaidnode(Configuration conf) throws IOException {
+ return createRaidnode(RaidNode.getAddress(conf), conf);
+ }
+
+ public static RaidProtocol createRaidnode(InetSocketAddress raidNodeAddr,
+ Configuration conf) throws IOException {
+ try {
+ return createRaidnode(createRPCRaidnode(raidNodeAddr, conf,
+ UnixUserGroupInformation.login(conf, true)));
+ } catch (LoginException e) {
+ throw (IOException)(new IOException().initCause(e));
+ }
+ }
+
+ private static RaidProtocol createRPCRaidnode(InetSocketAddress raidNodeAddr,
+ Configuration conf, UnixUserGroupInformation ugi)
+ throws IOException {
+ LOG.info("RaidShell connecting to " + raidNodeAddr);
+ return (RaidProtocol)RPC.getProxy(RaidProtocol.class,
+ RaidProtocol.versionID, raidNodeAddr, ugi, conf,
+ NetUtils.getSocketFactory(conf, RaidProtocol.class));
+ }
+
+ private static RaidProtocol createRaidnode(RaidProtocol rpcRaidnode)
+ throws IOException {
+ RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
+ 5, 5000, TimeUnit.MILLISECONDS);
+
+ Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+
+ Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
+ new HashMap<Class<? extends Exception>, RetryPolicy>();
+ exceptionToPolicyMap.put(RemoteException.class,
+ RetryPolicies.retryByRemoteException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
+ RetryPolicy methodPolicy = RetryPolicies.retryByException(
+ RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
+ Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
+
+ methodNameToPolicyMap.put("create", methodPolicy);
+
+ return (RaidProtocol) RetryProxy.create(RaidProtocol.class,
+ rpcRaidnode, methodNameToPolicyMap);
+ }
+
+ private void checkOpen() throws IOException {
+ if (!clientRunning) {
+ IOException result = new IOException("RaidNode closed");
+ throw result;
+ }
+ }
+
+ /**
+ * Close the connection to the raidNode.
+ */
+ public synchronized void close() throws IOException {
+ if(clientRunning) {
+ clientRunning = false;
+ RPC.stopProxy(rpcRaidnode);
+ }
+ }
+
+ /**
+ * Displays format of commands.
+ */
+ private static void printUsage(String cmd) {
+ String prefix = "Usage: java " + RaidShell.class.getSimpleName();
+ if ("-showConfig".equals(cmd)) {
+ System.err.println("Usage: java RaidShell" +
+ " [-showConfig]");
+ } else if ("-recover".equals(cmd)) {
+ System.err.println("Usage: java CronShell" +
+ " [-recover srcPath1 corruptOffset]");
+ } else {
+ System.err.println("Usage: java RaidShell");
+ System.err.println(" [-showConfig ]");
+ System.err.println(" [-help [cmd]]");
+ System.err.println(" [-recover srcPath1 corruptOffset]");
+ System.err.println();
+ ToolRunner.printGenericCommandUsage(System.err);
+ }
+ }
+
+ /**
+ * run
+ */
+ public int run(String argv[]) throws Exception {
+
+ if (argv.length < 1) {
+ printUsage("");
+ return -1;
+ }
+
+ int exitCode = -1;
+ int i = 0;
+ String cmd = argv[i++];
+ //
+ // verify that we have enough command line parameters
+ //
+ if ("-showConfig".equals(cmd)) {
+ if (argv.length < 1) {
+ printUsage(cmd);
+ return exitCode;
+ }
+ } else if ("-recover".equals(cmd)) {
+ if (argv.length < 3) {
+ printUsage(cmd);
+ return exitCode;
+ }
+ }
+
+ try {
+ if ("-showConfig".equals(cmd)) {
+ exitCode = showConfig(cmd, argv, i);
+ } else if ("-recover".equals(cmd)) {
+ exitCode = recover(cmd, argv, i);
+ } else {
+ exitCode = -1;
+ System.err.println(cmd.substring(1) + ": Unknown command");
+ printUsage("");
+ }
+ } catch (IllegalArgumentException arge) {
+ exitCode = -1;
+ System.err.println(cmd.substring(1) + ": " + arge.getLocalizedMessage());
+ printUsage(cmd);
+ } catch (RemoteException e) {
+ //
+ // This is a error returned by raidnode server. Print
+ // out the first line of the error mesage, ignore the stack trace.
+ exitCode = -1;
+ try {
+ String[] content;
+ content = e.getLocalizedMessage().split("\n");
+ System.err.println(cmd.substring(1) + ": " +
+ content[0]);
+ } catch (Exception ex) {
+ System.err.println(cmd.substring(1) + ": " +
+ ex.getLocalizedMessage());
+ }
+ } catch (IOException e) {
+ //
+ // IO exception encountered locally.
+ //
+ exitCode = -1;
+ System.err.println(cmd.substring(1) + ": " +
+ e.getLocalizedMessage());
+ } catch (Exception re) {
+ exitCode = -1;
+ System.err.println(cmd.substring(1) + ": " + re.getLocalizedMessage());
+ } finally {
+ }
+ return exitCode;
+ }
+
+ /**
+ * Apply operation specified by 'cmd' on all parameters
+ * starting from argv[startindex].
+ */
+ private int showConfig(String cmd, String argv[], int startindex) throws IOException {
+ int exitCode = 0;
+ int i = startindex;
+ PolicyList[] all = raidnode.getAllPolicies();
+ for (PolicyList list: all) {
+ for (PolicyInfo p : list.getAll()) {
+ System.out.println(p);
+ }
+ }
+ return exitCode;
+ }
+
+ /**
+ * Recovers the specified path from the parity file
+ */
+ public int recover(String cmd, String argv[], int startindex)
+ throws IOException {
+ int exitCode = 0;
+ String[] paths = new String[argv.length - startindex];
+ for (int i = startindex; i < argv.length; i = i + 2) {
+ String path = argv[i];
+ long corruptOffset = Long.parseLong(argv[i+1]);
+ LOG.info("RaidShell recoverFile for " + path + " corruptOffset " + corruptOffset);
+ raidnode.recoverFile(path, corruptOffset);
+ }
+ return 0;
+ }
+
+ /**
+ * main() has some simple utility methods
+ */
+ public static void main(String argv[]) throws Exception {
+ RaidShell shell = null;
+ try {
+ shell = new RaidShell();
+ } catch (RPC.VersionMismatch v) {
+ System.err.println("Version Mismatch between client and server" +
+ "... command aborted.");
+ System.exit(-1);
+ } catch (IOException e) {
+ System.err.println("Bad connection to RaidNode. command aborted.");
+ System.exit(-1);
+ }
+
+ int res;
+ try {
+ res = ToolRunner.run(shell, argv);
+ } finally {
+ shell.close();
+ }
+ System.exit(res);
+ }
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyInfo.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid.protocol;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+import java.util.Enumeration;
+import java.lang.Math;
+import java.lang.Class;
+import java.text.SimpleDateFormat;
+import java.util.StringTokenizer;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
+
+/**
+ * Maintains information about one policy
+ */
+public class PolicyInfo implements Writable {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.protocol.PolicyInfo");
+ protected static final SimpleDateFormat dateFormat =
+ new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+
+ private Path srcPath; // the specified src path
+ private String policyName; // name of policy
+ private String destinationPath; // A destination path for this policy
+ private String description; // A verbose description of this policy
+ private Configuration conf; // Hadoop configuration
+
+ private Properties properties; // Policy-dependent properties
+
+ private ReentrantReadWriteLock plock; // protects policy operations.
+
+ /**
+ * Create the empty object
+ */
+ public PolicyInfo() {
+ this.conf = null;
+ this.policyName = "";
+ this.description = "";
+ this.srcPath = null;
+ this.properties = new Properties();
+ this.plock = new ReentrantReadWriteLock();
+ }
+
+ /**
+ * Create the metadata that describes a policy
+ */
+ public PolicyInfo(String policyName, Configuration conf) {
+ this.conf = conf;
+ this.policyName = policyName;
+ this.description = "";
+ this.srcPath = null;
+ this.properties = new Properties();
+ this.plock = new ReentrantReadWriteLock();
+ }
+
+ /**
+ * Sets the input path on which this policy has to be applied
+ */
+ public void setSrcPath(String in) throws IOException {
+ srcPath = new Path(in);
+ srcPath = srcPath.makeQualified(srcPath.getFileSystem(conf));
+ }
+
+ /**
+ * Set the destination path of this policy.
+ */
+ public void setDestinationPath(String des) {
+ this.destinationPath = des;
+ }
+
+ /**
+ * Set the description of this policy.
+ */
+ public void setDescription(String des) {
+ this.description = des;
+ }
+
+ /**
+ * Sets an internal property.
+ * @param name property name.
+ * @param value property value.
+ */
+ public void setProperty(String name, String value) {
+ properties.setProperty(name, value);
+ }
+
+ /**
+ * Returns the value of an internal property.
+ * @param name property name.
+ */
+ public String getProperty(String name) {
+ return properties.getProperty(name);
+ }
+
+ /**
+ * Get the name of this policy.
+ */
+ public String getName() {
+ return this.policyName;
+ }
+
+ /**
+ * Get the destination path of this policy.
+ */
+ public String getDestinationPath() {
+ return this.destinationPath;
+ }
+
+ /**
+ * Get the srcPath
+ */
+ public Path getSrcPath() throws IOException {
+ return srcPath;
+ }
+
+ /**
+ * Get the expanded (unglobbed) forms of the srcPaths
+ */
+ public Path[] getSrcPathExpanded() throws IOException {
+ FileSystem fs = srcPath.getFileSystem(conf);
+
+ // globbing on srcPath
+ FileStatus[] gpaths = fs.globStatus(srcPath);
+ if (gpaths == null) {
+ return null;
+ }
+ Path[] values = new Path[gpaths.length];
+ for (int i = 0; i < gpaths.length; i++) {
+ Path p = gpaths[i].getPath();
+ values[i] = p.makeQualified(fs);
+ }
+ return values;
+ }
+
+ /**
+ * Convert this policy into a printable form
+ */
+ public String toString() {
+ StringBuffer buff = new StringBuffer();
+ buff.append("Policy Name:\t" + policyName + " --------------------\n");
+ buff.append("Source Path:\t" + srcPath + "\n");
+ buff.append("Dest Path:\t" + destinationPath + "\n");
+ for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
+ String name = (String) e.nextElement();
+ buff.append( name + ":\t" + properties.getProperty(name) + "\n");
+ }
+ if (description.length() > 0) {
+ int len = Math.min(description.length(), 80);
+ String sub = description.substring(0, len).trim();
+ sub = sub.replaceAll("\n", " ");
+ buff.append("Description:\t" + sub + "...\n");
+ }
+ return buff.toString();
+ }
+
+ //////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (PolicyInfo.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new PolicyInfo(); }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ Text.writeString(out, srcPath.toString());
+ Text.writeString(out, policyName);
+ Text.writeString(out, destinationPath);
+ Text.writeString(out, description);
+ out.writeInt(properties.size());
+ for (Enumeration<?> e = properties.propertyNames(); e.hasMoreElements();) {
+ String name = (String) e.nextElement();
+ Text.writeString(out, name);
+ Text.writeString(out, properties.getProperty(name));
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ this.srcPath = new Path(Text.readString(in));
+ this.policyName = Text.readString(in);
+ this.destinationPath = Text.readString(in);
+ this.description = Text.readString(in);
+ for (int n = in.readInt(); n>0; n--) {
+ String name = Text.readString(in);
+ String value = Text.readString(in);
+ properties.setProperty(name,value);
+ }
+ }
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/PolicyList.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.raid.protocol;
+
+import java.io.IOException;
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.util.Collection;
+import java.util.List;
+import java.util.LinkedList;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableFactories;
+import org.apache.hadoop.io.WritableFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * Maintains informatiom about all policies that belong to a category.
+ * These policies have to be applied one-at-a-time and cannot be run
+ * simultaneously.
+ */
+public class PolicyList implements Writable {
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.raid.protocol.PolicyList");
+
+ private List<PolicyInfo> category; // list of policies
+ private Path srcPath;
+
+ /**
+ * Create a new category of policies.
+ */
+ public PolicyList() {
+ this.category = new LinkedList<PolicyInfo>();
+ this.srcPath = null;
+ }
+
+ /**
+ * Add a new policy to this category.
+ */
+ public void add(PolicyInfo info) {
+ category.add(info);
+ }
+
+ public void setSrcPath(Configuration conf, String src) throws IOException {
+ srcPath = new Path(src);
+ srcPath = srcPath.makeQualified(srcPath.getFileSystem(conf));
+ }
+
+ public Path getSrcPath() {
+ return srcPath;
+ }
+
+ /**
+ * Returns the policies in this category
+ */
+ public Collection<PolicyInfo> getAll() {
+ return category;
+ }
+
+ /**
+ * Sort Categries based on their srcPath. reverse lexicographical order.
+ */
+ public static class CompareByPath implements Comparator<PolicyList> {
+ public CompareByPath() throws IOException {
+ }
+ public int compare(PolicyList l1, PolicyList l2) {
+ return 0 - l1.getSrcPath().compareTo(l2.getSrcPath());
+ }
+ }
+
+
+ //////////////////////////////////////////////////
+ // Writable
+ //////////////////////////////////////////////////
+ static { // register a ctor
+ WritableFactories.setFactory
+ (PolicyList.class,
+ new WritableFactory() {
+ public Writable newInstance() { return new PolicyList(); }
+ });
+ }
+
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(category.size());
+ for (PolicyInfo p : category) {
+ p.write(out);
+ }
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int count = in.readInt();
+ for (int i = 0; i < count; i++) {
+ PolicyInfo p = new PolicyInfo();
+ p.readFields(in);
+ add(p);
+ }
+ }
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/protocol/RaidProtocol.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid.protocol;
+
+import java.util.Collection;
+import java.io.IOException;
+
+import org.apache.hadoop.ipc.VersionedProtocol;
+import org.apache.hadoop.fs.Path;
+
+/**********************************************************************
+ * RaidProtocol is used by user code
+ * {@link org.apache.hadoop.raid.RaidShell} class to communicate
+ * with the RaidNode. User code can manipulate the configured policies.
+ *
+ **********************************************************************/
+public interface RaidProtocol extends VersionedProtocol {
+
+ /**
+ * Compared to the previous version the following changes have been introduced:
+ * Only the latest change is reflected.
+ * 1: new protocol introduced
+ */
+ public static final long versionID = 1L;
+
+ /**
+ * A set of codes returned by RPC calls.
+ */
+ public enum ReturnStatus {
+ SUCCESS ((int)0x01),
+ FAILURE ((int)0x02),
+ RETRY ((int)0x03);
+ private int code;
+
+ private ReturnStatus(int code) {
+ this.code = code;
+ }
+ int getReturnStatus() {return code;}
+ }
+
+ /**
+ * Get a listing of all configured policies
+ * @throws IOException
+ * return all categories of configured policies
+ */
+ public PolicyList[] getAllPolicies() throws IOException;
+
+ /**
+ * Unraid the specified input path. This is called when the specified file
+ * is corrupted. This call will move the specified file to file.old
+ * and then recover it from the RAID subsystem.
+ *
+ * @param inputPath The absolute pathname of the file to be recovered.
+ * @param corruptOffset The offset that has the corruption
+ */
+ public ReturnStatus recoverFile(String inputPath, long corruptOffset) throws IOException;
+
+}
Added: hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java?rev=822153&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java (added)
+++ hadoop/hdfs/trunk/src/contrib/raid/src/test/org/apache/hadoop/hdfs/TestRaidDfs.java Tue Oct 6 07:31:11 2009
@@ -0,0 +1,290 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.GregorianCalendar;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import java.util.Random;
+import java.util.zip.CRC32;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.DistributedRaidFileSystem;
+import org.apache.hadoop.raid.RaidNode;
+
+public class TestRaidDfs extends TestCase {
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+ final static String CONFIG_FILE = new File(TEST_DIR,
+ "test-raid.xml").getAbsolutePath();
+ final static long RELOAD_INTERVAL = 1000;
+ final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
+ final Random rand = new Random();
+ final static int NUM_DATANODES = 3;
+
+ Configuration conf;
+ String namenode = null;
+ String hftp = null;
+ MiniDFSCluster dfs = null;
+ FileSystem fileSys = null;
+ RaidNode cnode = null;
+ String jobTrackerName = null;
+
+ private void mySetup() throws Exception {
+
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ conf = new Configuration();
+
+ conf.set("raid.config.file", CONFIG_FILE);
+ conf.setBoolean("raid.config.reload", true);
+ conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+ // scan all policies once every 5 second
+ conf.setLong("raid.policy.rescan.interval", 5000);
+
+ // do not use map-reduce cluster for Raiding
+ conf.setBoolean("fs.raidnode.local", true);
+ conf.setInt("hdfs.raid.stripeLength", 3);
+ conf.set("hdfs.raid.locations", "/destraid");
+
+ dfs = new MiniDFSCluster(conf, NUM_DATANODES, true, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getUri().toString();
+ hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+ FileSystem.setDefaultUri(conf, namenode);
+
+ FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+ fileWriter.write("<?xml version=\"1.0\"?>\n");
+ String str = "<configuration> " +
+ "<srcPath prefix=\"/user/dhruba/raidtest\"> " +
+ "<policy name = \"RaidTest1\"> " +
+ "<destPath> /destraid</destPath> " +
+ "<property> " +
+ "<name>targetReplication</name> " +
+ "<value>1</value> " +
+ "<description>after RAIDing, decrease the replication factor of a file to this value." +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>metaReplication</name> " +
+ "<value>1</value> " +
+ "<description> replication factor of parity file" +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>modTimePeriod</name> " +
+ "<value>2000</value> " +
+ "<description> time (milliseconds) after a file is modified to make it " +
+ "a candidate for RAIDing " +
+ "</description> " +
+ "</property> " +
+ "</policy>" +
+ "</srcPath>" +
+ "</configuration>";
+ fileWriter.write(str);
+ fileWriter.close();
+ }
+
+ private void myTearDown() throws Exception {
+ if (cnode != null) { cnode.stop(); cnode.join(); }
+ if (dfs != null) { dfs.shutdown(); }
+ }
+
+ /**
+ * Test DFS Raid
+ */
+ public void testRaidDfs() throws Exception {
+ LOG.info("Test testRaidDfs started.");
+ long blockSize = 8192L;
+ int stripeLength = 3;
+ mySetup();
+ Path file1 = new Path("/user/dhruba/raidtest/file1");
+ Path destPath = new Path("/destraid/user/dhruba/raidtest");
+ long crc1 = createOldFile(fileSys, file1, 1, 7, blockSize);
+ LOG.info("Test testPathFilter created test files");
+
+ // create an instance of the RaidNode
+ cnode = RaidNode.createRaidNode(null, conf);
+
+ try {
+ FileStatus[] listPaths = null;
+
+ // wait till file is raided
+ while (listPaths == null || listPaths.length != 1) {
+ LOG.info("Test testPathFilter waiting for files to be raided.");
+ try {
+ listPaths = fileSys.listStatus(destPath);
+ } catch (FileNotFoundException e) {
+ //ignore
+ }
+ Thread.sleep(1000); // keep waiting
+ }
+ assertEquals(listPaths.length, 1); // all files raided
+ LOG.info("Files raided so far : " + listPaths[0].getPath());
+
+ // extract block locations from File system. Wait till file is closed.
+ LocatedBlocks locations = null;
+ DistributedFileSystem dfs = (DistributedFileSystem) fileSys;
+ while (true) {
+ locations = dfs.getClient().getNamenode().getBlockLocations(file1.toString(),
+ 0, listPaths[0].getLen());
+ if (!locations.isUnderConstruction()) {
+ break;
+ }
+ Thread.sleep(1000);
+ }
+
+ // filter all filesystem calls from client
+ Configuration clientConf = new Configuration(conf);
+ clientConf.set("fs.hdfs.impl", "org.apache.hadoop.dfs.DistributedRaidFileSystem");
+ DistributedRaidFileSystem raidfs = new DistributedRaidFileSystem(dfs);
+ raidfs.initialize(dfs.getUri(), clientConf);
+
+ // corrupt first block of file
+ LOG.info("Corrupt first block of file");
+ corruptBlock(file1, locations.get(0).getBlock());
+ validateFile(raidfs, file1, file1, crc1);
+
+ } catch (Exception e) {
+ LOG.info("testPathFilter Exception " + e + StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ myTearDown();
+ }
+ LOG.info("Test testPathFilter completed.");
+ }
+
+ //
+ // creates a file and populate it with random data. Returns its crc.
+ //
+ private long createOldFile(FileSystem fileSys, Path name, int repl, int numBlocks, long blocksize)
+ throws IOException {
+ CRC32 crc = new CRC32();
+ FSDataOutputStream stm = fileSys.create(name, true,
+ fileSys.getConf().getInt("io.file.buffer.size", 4096),
+ (short)repl, blocksize);
+ // fill random data into file
+ final byte[] b = new byte[(int)blocksize];
+ for (int i = 0; i < numBlocks; i++) {
+ rand.nextBytes(b);
+ stm.write(b);
+ crc.update(b);
+ }
+ stm.close();
+ return crc.getValue();
+ }
+
+ //
+ // validates that file matches the crc.
+ //
+ private void validateFile(FileSystem fileSys, Path name1, Path name2, long crc)
+ throws IOException {
+
+ FileStatus stat1 = fileSys.getFileStatus(name1);
+ FileStatus stat2 = fileSys.getFileStatus(name2);
+ assertTrue(" Length of file " + name1 + " is " + stat1.getLen() +
+ " is different from length of file " + name1 + " " + stat2.getLen(),
+ stat1.getLen() == stat2.getLen());
+
+ CRC32 newcrc = new CRC32();
+ FSDataInputStream stm = fileSys.open(name2);
+ final byte[] b = new byte[4192];
+ int num = 0;
+ while (num >= 0) {
+ num = stm.read(b);
+ if (num < 0) {
+ break;
+ }
+ newcrc.update(b, 0, num);
+ }
+ stm.close();
+ LOG.info(" Newcrc " + newcrc.getValue() + " old crc " + crc);
+ if (newcrc.getValue() != crc) {
+ fail("CRC mismatch of files " + name1 + " with file " + name2);
+ }
+ }
+
+ /*
+ * The Data directories for a datanode
+ */
+ private File[] getDataNodeDirs(int i) throws IOException {
+ File base_dir = new File(System.getProperty("test.build.data"), "dfs/");
+ File data_dir = new File(base_dir, "data");
+ File dir1 = new File(data_dir, "data"+(2*i+1));
+ File dir2 = new File(data_dir, "data"+(2*i+2));
+ if (dir1.isDirectory() && dir2.isDirectory()) {
+ File[] dir = new File[2];
+ dir[0] = new File(dir1, "current");
+ dir[1] = new File(dir2, "current");
+ return dir;
+ }
+ return new File[0];
+ }
+
+ //
+ // Corrupt specified block of file
+ //
+ void corruptBlock(Path file, Block blockNum) throws IOException {
+ long id = blockNum.getBlockId();
+
+ // Now deliberately remove/truncate data blocks from the block.
+ //
+ for (int i = 0; i < NUM_DATANODES; i++) {
+ File[] dirs = getDataNodeDirs(i);
+
+ for (int j = 0; j < dirs.length; j++) {
+ File[] blocks = dirs[j].listFiles();
+ assertTrue("Blocks do not exist in data-dir", (blocks != null) && (blocks.length >= 0));
+ for (int idx = 0; idx < blocks.length; idx++) {
+ if (blocks[idx].getName().startsWith("blk_" + id) &&
+ !blocks[idx].getName().endsWith(".meta")) {
+ blocks[idx].delete();
+ LOG.info("Deleted block " + blocks[idx]);
+ }
+ }
+ }
+ }
+ }
+
+}