You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dh...@apache.org on 2010/02/17 00:40:42 UTC
svn commit: r910774 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
Author: dhruba
Date: Tue Feb 16 23:40:42 2010
New Revision: 910774
URL: http://svn.apache.org/viewvc?rev=910774&view=rev
Log:
MAPREDUCE-1491. The parity files created by the RAID are combined
using Hadoop Archive Files (HAR). (Rodrigo Schmidt via dhruba)
Added:
hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=910774&r1=910773&r2=910774&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Feb 16 23:40:42 2010
@@ -345,8 +345,6 @@
instantiation and initialization of the DistributedRaidFileSystem.
(Rodrigo Schmidt via dhruba)
- cdouglas)
-
MAPREDUCE-1476. Fix the M/R framework to not call commit for special
tasks like job setup/cleanup and task cleanup.
(Amareshwari Sriramadasu via yhemanth)
@@ -357,6 +355,9 @@
MAPREDUCE-1444. Sqoop ConnManager instances can leak Statement objects.
(Aaron Kimball via tomwhite)
+
+ MAPREDUCE-1491. The parity files created by the RAID are combined
+ using Hadoop Archive Files (HAR). (Rodrigo Schmidt via dhruba)
Release 0.21.0 - Unreleased
Modified: hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java?rev=910774&r1=910773&r2=910774&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/java/org/apache/hadoop/raid/RaidNode.java Tue Feb 16 23:40:42 2010
@@ -30,6 +30,8 @@
import java.util.HashSet;
import java.lang.Thread;
import java.net.InetSocketAddress;
+import java.net.URI;
+
import org.xml.sax.SAXException;
import javax.xml.parsers.ParserConfigurationException;
@@ -37,8 +39,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.tools.HadoopArchives;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.ToolRunner;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileStatus;
@@ -51,7 +55,6 @@
import org.apache.hadoop.raid.protocol.PolicyInfo;
import org.apache.hadoop.raid.protocol.PolicyList;
import org.apache.hadoop.raid.protocol.RaidProtocol;
-import org.apache.hadoop.raid.protocol.RaidProtocol.ReturnStatus;
/**
* A {@link RaidNode} that implements
@@ -70,7 +73,8 @@
public static final int DEFAULT_PORT = 60000;
public static final int DEFAULT_STRIPE_LENGTH = 5; // default value of stripe length
public static final String DEFAULT_RAID_LOCATION = "/raid";
-
+ public static final String HAR_SUFFIX = "_raid.har";
+
/** RPC server */
private Server server;
/** RPC server address */
@@ -92,6 +96,9 @@
/** Deamon thread to delete obsolete parity files */
Daemon purgeThread = null;
+
+ /** Deamon thread to har raid directories */
+ Daemon harThread = null;
/** Do do distributed raiding */
boolean isRaidLocal = false;
@@ -248,8 +255,13 @@
// start the thread that deletes obsolete parity files
this.purgeThread = new Daemon(new PurgeMonitor());
this.purgeThread.start();
+
+ // start the thread that creates HAR files
+ this.harThread = new Daemon(new HarMonitor());
+ this.harThread.start();
}
+
/**
* Implement RaidProtocol methods
*/
@@ -453,6 +465,66 @@
return null; // no matching policies
}
+ /**
+ * Returns the Path to the parity file of a given file
+ *
+ * @param destPathPrefix Destination prefix defined by some policy
+ * @param srcPath Path to the original source file
+ * @param create Boolean value telling whether a new parity file should be created
+ * @return Path object representing the parity file of the source
+ * @throws IOException
+ */
+ static private Path getParityFile(Path destPathPrefix, Path srcPath, boolean create, Configuration conf) throws IOException {
+ Path srcParent = srcPath.getParent();
+
+ FileSystem fsDest = destPathPrefix.getFileSystem(conf);
+
+ Path outDir = destPathPrefix;
+ if (srcParent != null) {
+ if (srcParent.getParent() == null) {
+ outDir = destPathPrefix;
+ } else {
+ outDir = new Path(destPathPrefix, makeRelative(srcParent));
+ }
+ }
+
+ String harDirName = srcParent.getName() + HAR_SUFFIX;
+ Path HarPath = new Path(outDir,harDirName);
+ Path outPath = new Path(destPathPrefix, makeRelative(srcPath));
+
+ if (create || !fsDest.exists(HarPath)) { // case 1: no HAR file
+ return outPath;
+ }
+
+ URI HarPathUri = HarPath.toUri();
+ Path inHarPath = new Path("har://",HarPathUri.getPath()+"/"+outPath.toUri().getPath());
+ FileSystem fsHar = inHarPath.getFileSystem(conf);
+
+ if (!fsHar.exists(inHarPath)) { // case 2: no file inside HAR
+ return outPath;
+ }
+
+ if (! fsDest.exists(outPath)) { // case 3: only inside HAR
+ return inHarPath;
+ }
+
+ // both inside and outside HAR. Should return most recent
+ FileStatus inHar = fsHar.getFileStatus(inHarPath);
+ FileStatus outHar = fsDest.getFileStatus(outPath);
+
+ if (inHar.getModificationTime() >= outHar.getModificationTime()) {
+ return inHarPath;
+ }
+
+ return outPath;
+ }
+
+ private Path getParityFile(Path destPathPrefix, Path srcPath, boolean create) throws IOException {
+
+ return getParityFile(destPathPrefix, srcPath, create, conf);
+
+ }
+
/**
* Returns a list of pathnames that needs raiding.
*/
@@ -520,7 +592,7 @@
// does not match the modTime of the source file, then recalculate RAID
boolean add = false;
try {
- Path outpath = new Path(destPathPrefix, makeRelative(path));
+ Path outpath = getParityFile(destPathPrefix, path, false);
FileSystem outFs = outpath.getFileSystem(conf);
FileStatus ostat = outFs.getFileStatus(outpath);
if (ostat.getModificationTime() != src.getModificationTime() &&
@@ -538,7 +610,9 @@
} else if (files != null) {
for (FileStatus one:files) {
- recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
+ if (!one.getPath().getName().endsWith(HAR_SUFFIX)){
+ recurse(srcFs, conf, destPathPrefix, one, accept, modTimePeriod, srcReplication, now);
+ }
}
}
}
@@ -678,7 +752,7 @@
long fileSize = stat.getLen();
// create output tmp path
- Path outpath = new Path(destPathPrefix, makeRelative(inpath));
+ Path outpath = getParityFile(destPathPrefix, inpath,true,conf);
Path tmppath = new Path(outpath + ".tmp");
FileSystem outFs = outpath.getFileSystem(conf);
@@ -688,6 +762,7 @@
if (stmp.getModificationTime() == stat.getModificationTime()) {
LOG.info("Parity file for " + inpath + "(" + locations.length + ") is " + outpath +
" already upto-date. Nothing more to do.");
+ return;
}
} catch (IOException e) {
// ignore errors because the raid file might not exist yet.
@@ -891,7 +966,7 @@
LOG.info("Decompose a total of " + numLength + " blocks.");
// open and seek to the appropriate offset in parity file.
- Path parityFile = new Path(destPathPrefix, makeRelative(srcPath));
+ Path parityFile = getParityFile(destPathPrefix, srcPath, false, conf);
FileSystem parityFs = parityFile.getFileSystem(conf);
LOG.info("Parity file for " + srcPath + " is " + parityFile);
ins[numLength-1] = parityFs.open(parityFile);
@@ -905,10 +980,11 @@
// We need to generate a unique name for this tmp file later on.
Path tmpFile = null;
FSDataOutputStream fout = null;
+ FileSystem destFs = destPathPrefix.getFileSystem(conf);
int retry = 5;
try {
tmpFile = new Path("/tmp/dhruba/" + rand.nextInt());
- fout = parityFs.create(tmpFile, false);
+ fout = destFs.create(tmpFile, false);
} catch (IOException e) {
if (retry-- <= 0) {
LOG.info("Unable to create temporary file " + tmpFile +
@@ -935,17 +1011,17 @@
// Now, reopen the source file and the recovered block file
// and copy all relevant data to new file
- Path recoveredPath = new Path(destPathPrefix, makeRelative(srcPath));
+ Path recoveredPath = getParityFile(destPathPrefix, srcPath, true, conf);
recoveredPath = new Path(recoveredPath + ".recovered");
LOG.info("Creating recovered file " + recoveredPath);
FSDataInputStream sin = srcFs.open(srcPath);
- FSDataOutputStream out = parityFs.create(recoveredPath, false,
+ FSDataOutputStream out = destFs.create(recoveredPath, false,
conf.getInt("io.file.buffer.size", 64 * 1024),
srcStat.getReplication(),
srcStat.getBlockSize());
- FSDataInputStream bin = parityFs.open(tmpFile);
+ FSDataInputStream bin = destFs.open(tmpFile);
long recoveredSize = 0;
// copy all the good blocks (upto the corruption)
@@ -992,12 +1068,12 @@
bin.close();
// delete the temporary block file that was created.
- parityFs.delete(tmpFile, false);
+ destFs.delete(tmpFile, false);
LOG.info("Deleted temporary file " + tmpFile);
// copy the meta information from source path to the newly created
// recovered path
- copyMetaInformation(parityFs, srcStat, recoveredPath);
+ copyMetaInformation(destFs, srcStat, recoveredPath);
return recoveredPath;
}
@@ -1060,8 +1136,8 @@
FileSystem destFs = FileSystem.get(destp.toUri(), conf);
destp = destp.makeQualified(destFs);
destinationPrefix = destp.toUri().getPath(); // strip host:port
- destp = new Path(destp, makeRelative(info.getSrcPath()));
-
+ destp = getParityFile(destp, info.getSrcPath(), true);
+
// if this destination path has already been processed as part
// of another policy, then nothing more to do
if (processed.contains(destp)) {
@@ -1108,6 +1184,11 @@
String destStr = destPath.toUri().getPath();
LOG.debug("Checking " + destPath + " prefix " + destPrefix);
+ // Verify if it is a har file
+ if (destStr.endsWith(HAR_SUFFIX)) {
+ return;
+ }
+
// Verify the destPrefix is a prefix of the destPath
if (!destStr.startsWith(destPrefix)) {
LOG.error("Destination path " + destStr + " should have " +
@@ -1116,10 +1197,11 @@
}
String src = destStr.replaceFirst(destPrefix, "");
- // if the source path does not exist, then delete the
- // destination path as well
+ // if the source path does not exist or the parity file has been HARed,
+ // then delete the parity file
Path srcPath = new Path(src);
- if (!srcFs.exists(srcPath)) {
+ Path dstPath = (new Path(destPrefix.trim())).makeQualified(destFs);
+ if (!srcFs.exists(srcPath) || !destPath.equals(getParityFile(dstPath,srcPath,false))) {
boolean done = destFs.delete(destPath, false);
if (done) {
LOG.info("Purged path " + destPath );
@@ -1141,6 +1223,160 @@
}
}
+
+ private void doHar() throws IOException, InterruptedException {
+
+ PolicyList.CompareByPath lexi = new PolicyList.CompareByPath();
+
+ long prevExec = 0;
+ while (running) {
+
+ LOG.info("Started HAR thread");
+ // The config may be reloaded by the TriggerMonitor.
+ // This thread uses whatever config is currently active.
+ while(now() < prevExec + configMgr.getPeriodicity()){
+ Thread.sleep(SLEEP_TIME);
+ }
+
+ prevExec = now();
+
+ // fetch all categories
+ Collection<PolicyList> all = configMgr.getAllPolicies();
+
+ // sort all policies by reverse lexicographical order. This is
+ // needed to make the nearest policy take precedence.
+ PolicyList[] sorted = all.toArray(new PolicyList[all.size()]);
+ Arrays.sort(sorted, lexi);
+
+ for (PolicyList category : sorted) {
+ for (PolicyInfo info: category.getAll()) {
+ String str = info.getProperty("time_before_har");
+ String tmpHarPath = info.getProperty("har_tmp_dir");
+ if (tmpHarPath == null) {
+ tmpHarPath = "/raid_har";
+ }
+ if (str != null) {
+ try {
+ long cutoff = now() - ( Long.parseLong(str) * 24L * 3600000L );
+ // expand destination prefix path
+ String destinationPrefix = getDestinationPath(conf, info);
+ Path destp = new Path(destinationPrefix.trim());
+ FileSystem destFs = FileSystem.get(destp.toUri(), conf);
+ destp = destp.makeQualified(destFs);
+ destinationPrefix = destp.toUri().getPath(); // strip host:port
+ destp = getParityFile(destp, info.getSrcPath(), true);
+
+ FileStatus stat = null;
+ try {
+ stat = destFs.getFileStatus(destp);
+ } catch (FileNotFoundException e) {
+ // do nothing, leave stat = null;
+ }
+ if (stat != null) {
+ LOG.info("Purging obsolete parity files for policy " +
+ info.getName() + " " + destp);
+
+ recurseHar(destFs, stat, cutoff, tmpHarPath);
+ }
+
+
+ } catch (Exception e) {
+ LOG.warn("Ignoring Exception while processing policy " +
+ info.getName() + " " +
+ StringUtils.stringifyException(e));
+ }
+ }
+ }
+ }
+ }
+ return;
+ }
+
+ private void recurseHar(FileSystem destFs, FileStatus dest, long cutoff, String tmpHarPath)
+ throws IOException {
+
+ if (!dest.isDir()) {
+ return;
+ }
+
+ Path destPath = dest.getPath(); // pathname, no host:port
+
+ // Verify if it already contains a HAR directory
+ if ( destFs.exists(new Path(destPath, destPath.getName()+HAR_SUFFIX)) ) {
+ return;
+ }
+
+ FileStatus[] files = null;
+ files = destFs.listStatus(destPath);
+ boolean shouldHar = false;
+ if (files != null) {
+ shouldHar = files.length > 0;
+ for (FileStatus one: files) {
+ if (one.isDir()){
+ recurseHar(destFs, one, cutoff, tmpHarPath);
+ shouldHar = false;
+ } else if (one.getModificationTime() > cutoff ) {
+ shouldHar = false;
+ }
+ }
+ }
+ if ( shouldHar ) {
+ singleHar(destFs, dest, tmpHarPath);
+ }
+ }
+
+
+ private void singleHar(FileSystem destFs, FileStatus dest, String tmpHarPath) throws IOException {
+
+ Path root = new Path("/");
+ Path qualifiedPath = dest.getPath().makeQualified(destFs.getUri(),root);
+ String harFile = qualifiedPath.getName() + HAR_SUFFIX;
+ HadoopArchives har = new HadoopArchives(conf);
+ String[] args = new String[6];
+ args[0] = "-archiveName";
+ args[1] = harFile;
+ args[2] = "-p";
+ args[3] = root.makeQualified(destFs.getUri(),root).toString();
+ args[4] = qualifiedPath.toUri().getPath().substring(1);
+ args[5] = tmpHarPath.toString();
+ int ret = 0;
+ try {
+ ret = ToolRunner.run(har, args);
+ if (ret == 0 && !destFs.rename(new Path(tmpHarPath+"/"+harFile),
+ new Path(qualifiedPath, harFile))){
+ LOG.info("HAR rename didn't succeed");
+ ret = -2;
+ }
+ } catch (Exception exc) {
+ throw new IOException("Error while creating archive " + ret, exc);
+ }
+
+ if (ret != 0){
+ throw new IOException("Error while creating archive " + ret);
+ }
+ return;
+ }
+
+ /**
+ * Periodically generates HAR files
+ */
+ class HarMonitor implements Runnable {
+
+ public void run() {
+ while (running) {
+ try {
+ doHar();
+ } catch (Exception e) {
+ LOG.error(StringUtils.stringifyException(e));
+ } finally {
+ LOG.info("Har parity files thread continuing to run...");
+ }
+ }
+ }
+
+
+ }
+
/**
* If the config file has an entry for hdfs.raid.locations, then that overrides
* destination path specified in the raid policy file
Added: hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java?rev=910774&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/raid/src/test/org/apache/hadoop/raid/TestRaidHar.java Tue Feb 16 23:40:42 2010
@@ -0,0 +1,262 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.raid;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.FileNotFoundException;
+import java.util.Random;
+
+import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.log4j.Level;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.mapred.MiniMRCluster;
+
+/**
+ * If a file gets deleted, then verify that the parity file gets deleted too.
+ */
+public class TestRaidHar extends TestCase {
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/raid/test/data")).getAbsolutePath();
+ final static String CONFIG_FILE = new File(TEST_DIR,
+ "test-raid.xml").getAbsolutePath();
+ final static long RELOAD_INTERVAL = 1000;
+ final static Log LOG = LogFactory.getLog("org.apache.hadoop.raid.TestRaidNode");
+ final Random rand = new Random();
+
+ {
+ ((Log4JLogger)RaidNode.LOG).getLogger().setLevel(Level.ALL);
+ }
+
+
+ Configuration conf;
+ String namenode = null;
+ String hftp = null;
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ String jobTrackerName = null;
+
+ /**
+ * create mapreduce and dfs clusters
+ */
+ private void createClusters(boolean local) throws Exception {
+
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ conf = new Configuration();
+ conf.set("raid.config.file", CONFIG_FILE);
+ conf.setBoolean("raid.config.reload", true);
+ conf.setLong("raid.config.reload.interval", RELOAD_INTERVAL);
+
+ // scan all policies once every 5 second
+ conf.setLong("raid.policy.rescan.interval", 5000);
+
+ // make all deletions not go through Trash
+ conf.set("fs.shell.delete.classname", "org.apache.hadoop.hdfs.DFSClient");
+
+ // the RaidNode does the raiding inline (instead of submitting to map/reduce)
+ conf.setBoolean("fs.raidnode.local", local);
+
+ // create a dfs and map-reduce cluster
+ final int taskTrackers = 4;
+
+ dfs = new MiniDFSCluster(conf, 3, true, null);
+ dfs.waitActive();
+ fileSys = dfs.getFileSystem();
+ namenode = fileSys.getUri().toString();
+ mr = new MiniMRCluster(taskTrackers, namenode, 3);
+ jobTrackerName = "localhost:" + mr.getJobTrackerPort();
+ hftp = "hftp://localhost.localdomain:" + dfs.getNameNodePort();
+
+ FileSystem.setDefaultUri(conf, namenode);
+ conf.set("mapred.job.tracker", jobTrackerName);
+ }
+
+ /**
+ * create raid.xml file for RaidNode
+ */
+ private void mySetup(long targetReplication,
+ long metaReplication, long stripeLength) throws Exception {
+ FileWriter fileWriter = new FileWriter(CONFIG_FILE);
+ fileWriter.write("<?xml version=\"1.0\"?>\n");
+ String str = "<configuration> " +
+ "<srcPath prefix=\"/user/test/raidtest\"> " +
+ "<policy name = \"RaidTest1\"> " +
+ "<destPath> /destraid</destPath> " +
+ "<property> " +
+ "<name>targetReplication</name> " +
+ "<value>" + targetReplication + "</value> " +
+ "<description>after RAIDing, decrease the replication factor of a file to this value." +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>metaReplication</name> " +
+ "<value>" + metaReplication + "</value> " +
+ "<description> replication factor of parity file" +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>stripeLength</name> " +
+ "<value>" + stripeLength + "</value> " +
+ "<description> the max number of blocks in a file to RAID together " +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>time_before_har</name> " +
+ "<value>0</value> " +
+ "<description> amount of time waited before har'ing parity files" +
+ "</description> " +
+ "</property> " +
+ "<property> " +
+ "<name>modTimePeriod</name> " +
+ "<value>2000</value> " +
+ "<description> time (milliseconds) after a file is modified to make it " +
+ "a candidate for RAIDing " +
+ "</description> " +
+ "</property> " +
+ "</policy>" +
+ "</srcPath>" +
+ "</configuration>";
+ fileWriter.write(str);
+ fileWriter.close();
+ }
+
+ /**
+ * stop clusters created earlier
+ */
+ private void stopClusters() throws Exception {
+ if (mr != null) { mr.shutdown(); }
+ if (dfs != null) { dfs.shutdown(); }
+ }
+
+ /**
+ * Test that parity files that do not have an associated master file
+ * get deleted.
+ */
+ public void testRaidHar() throws Exception {
+ LOG.info("Test testRaidHar started.");
+
+ long blockSizes [] = {1024L};
+ long stripeLengths [] = {5};
+ long targetReplication = 1;
+ long metaReplication = 1;
+ int numBlock = 9;
+ int iter = 0;
+
+ createClusters(true);
+ try {
+ for (long blockSize : blockSizes) {
+ for (long stripeLength : stripeLengths) {
+ doTestHar(iter, targetReplication, metaReplication,
+ stripeLength, blockSize, numBlock);
+ iter++;
+ }
+ }
+ } finally {
+ stopClusters();
+ }
+ LOG.info("Test testRaidHar completed.");
+ }
+
+ /**
+ * Create parity file, delete original file and then validate that
+ * parity file is automatically deleted.
+ */
+ private void doTestHar(int iter, long targetReplication,
+ long metaReplication, long stripeLength,
+ long blockSize, int numBlock) throws Exception {
+ LOG.info("doTestHar started---------------------------:" + " iter " + iter +
+ " blockSize=" + blockSize + " stripeLength=" + stripeLength);
+ mySetup(targetReplication, metaReplication, stripeLength);
+ RaidShell shell = null;
+ Path dir = new Path("/user/test/raidtest/");
+ Path file1 = new Path(dir + "/file" + iter);
+ RaidNode cnode = null;
+ try {
+ Path destPath = new Path("/destraid/user/test/raidtest");
+ fileSys.delete(dir, true);
+ fileSys.delete(destPath, true);
+ TestRaidNode.createOldFile(fileSys, file1, 1, numBlock, blockSize);
+ LOG.info("doTestHar created test files for iteration " + iter);
+
+ // create an instance of the RaidNode
+ cnode = RaidNode.createRaidNode(null, conf);
+ int times = 10;
+
+ while (times-- > 0) {
+ try {
+ shell = new RaidShell(conf);
+ } catch (Exception e) {
+ LOG.info("doTestHar unable to connect to " + RaidNode.getAddress(conf) +
+ " retrying....");
+ Thread.sleep(1000);
+ continue;
+ }
+ break;
+ }
+ LOG.info("doTestHar created RaidShell.");
+ FileStatus[] listPaths = null;
+
+ // wait till file is raided
+ while (true) {
+ try {
+ listPaths = fileSys.listStatus(destPath);
+ int count = 0;
+ if (listPaths != null) {
+ for (FileStatus s : listPaths) {
+ LOG.info("doTestHar found path " + s.getPath());
+ if (s.getPath().toString().endsWith(".har")) {
+ count++;
+ }
+ }
+ }
+ if (count == 1 && listPaths.length == 1) {
+ break;
+ }
+ } catch (FileNotFoundException e) {
+ //ignore
+ }
+ LOG.info("doTestHar waiting for files to be raided and parity files to be har'ed and deleted. Found " +
+ (listPaths == null ? "none" : listPaths.length));
+ Thread.sleep(1000); // keep waiting
+ }
+
+
+ } catch (Exception e) {
+ LOG.info("doTestHar Exception " + e +
+ StringUtils.stringifyException(e));
+ throw e;
+ } finally {
+ shell.close();
+ if (cnode != null) { cnode.stop(); cnode.join(); }
+ LOG.info("doTestHar delete file " + file1);
+ fileSys.delete(file1, true);
+ }
+ LOG.info("doTestHar completed:" + " blockSize=" + blockSize +
+ " stripeLength=" + stripeLength);
+ }
+}