You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/10 02:56:41 UTC
svn commit: r1167425 - in /hadoop/common/branches/branch-0.20-security: ./
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/protocol/
src/test/org/apache/hadoop/hdfs/
src/test/org/apache/hadoop/hdfs/server/datanode/
Author: suresh
Date: Sat Sep 10 00:56:41 2011
New Revision: 1167425
URL: http://svn.apache.org/viewvc?rev=1167425&view=rev
Log:
HDFS-1186. DNs should interrupt writers at start of recovery. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Sep 10 00:56:41 2011
@@ -119,6 +119,9 @@ Release 0.20.205.0 - unreleased
HDFS-1218. Blocks recovered on startup should be treated with lower
priority during block synchronization. (Todd Lipcon via suresh)
+ HDFS-1186. DNs should interrupt writers at start of recovery.
+ (Todd Lipcon via suresh)
+
IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Sep 10 00:56:41 2011
@@ -1660,8 +1660,8 @@ public class DataNode extends Configured
}
@Override
- public BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException {
- return data.getBlockRecoveryInfo(block.getBlockId());
+ public BlockRecoveryInfo startBlockRecovery(Block block) throws IOException {
+ return data.startBlockRecovery(block.getBlockId());
}
public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
@@ -1769,7 +1769,7 @@ public class DataNode extends Configured
try {
InterDatanodeProtocol datanode = dnRegistration.equals(id)?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
- BlockRecoveryInfo info = datanode.getBlockRecoveryInfo(block);
+ BlockRecoveryInfo info = datanode.startBlockRecovery(block);
if (info == null) {
LOG.info("No block metadata found for block " + block + " on datanode "
+ id);
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Sep 10 00:56:41 2011
@@ -1134,32 +1134,39 @@ public class FSDataset implements FSCons
return;
}
- // interrupt and wait for all ongoing create threads
- for(Thread t : threads) {
- t.interrupt();
- }
- for(Thread t : threads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
- break; // retry with new threadlist from the beginning
- }
+ interruptAndJoinThreads(threads);
+ }
+ }
+
+ /**
+ * Try to interrupt all of the given threads, and join on them.
+ * If interrupted, returns false, indicating some threads may
+ * still be running.
+ */
+ private boolean interruptAndJoinThreads(List<Thread> threads) {
+ // interrupt and wait for all ongoing create threads
+ for(Thread t : threads) {
+ t.interrupt();
+ }
+ for(Thread t : threads) {
+ try {
+ t.join();
+ } catch (InterruptedException e) {
+ DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
+ return false;
}
}
+ return true;
}
+
/**
- * Try to update an old block to a new block.
- * If there are ongoing create threads running for the old block,
- * the threads will be returned without updating the block.
- *
- * @return ongoing create threads if there is any. Otherwise, return null.
+ * Return a list of active writer threads for the given block.
+ * @return null if there are no such threads or the file is
+ * not being created
*/
- private synchronized List<Thread> tryUpdateBlock(
- Block oldblock, Block newblock) throws IOException {
- //check ongoing create threads
- final ActiveFile activefile = ongoingCreates.get(oldblock);
+ private synchronized ArrayList<Thread> getActiveThreads(Block block) {
+ final ActiveFile activefile = ongoingCreates.get(block);
if (activefile != null && !activefile.threads.isEmpty()) {
//remove dead threads
for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
@@ -1168,13 +1175,30 @@ public class FSDataset implements FSCons
i.remove();
}
}
-
+
//return living threads
if (!activefile.threads.isEmpty()) {
return new ArrayList<Thread>(activefile.threads);
}
}
-
+ return null;
+ }
+
+ /**
+ * Try to update an old block to a new block.
+ * If there are ongoing create threads running for the old block,
+ * the threads will be returned without updating the block.
+ *
+ * @return ongoing create threads if there is any. Otherwise, return null.
+ */
+ private synchronized List<Thread> tryUpdateBlock(
+ Block oldblock, Block newblock) throws IOException {
+ //check ongoing create threads
+ ArrayList<Thread> activeThreads = getActiveThreads(oldblock);
+ if (activeThreads != null) {
+ return activeThreads;
+ }
+
//No ongoing create threads is alive. Update block.
File blockFile = findBlockFile(oldblock.getBlockId());
if (blockFile == null) {
@@ -1945,30 +1969,42 @@ public class FSDataset implements FSCons
}
@Override
- public synchronized BlockRecoveryInfo getBlockRecoveryInfo(long blockId)
- throws IOException {
+ public BlockRecoveryInfo startBlockRecovery(long blockId)
+ throws IOException {
Block stored = getStoredBlock(blockId);
if (stored == null) {
return null;
}
- ActiveFile activeFile = ongoingCreates.get(stored);
- boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
-
+ // It's important that this loop not be synchronized - otherwise
+ // this will deadlock against the thread it's joining against!
+ while (true) {
+ DataNode.LOG.debug(
+ "Interrupting active writer threads for block " + stored);
+ List<Thread> activeThreads = getActiveThreads(stored);
+ if (activeThreads == null) break;
+ if (interruptAndJoinThreads(activeThreads))
+ break;
+ }
- BlockRecoveryInfo info = new BlockRecoveryInfo(
- stored, isRecovery);
- if (DataNode.LOG.isDebugEnabled()) {
- DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
- " length " + stored.getNumBytes() +
- " genstamp " + stored.getGenerationStamp());
+ synchronized (this) {
+ ActiveFile activeFile = ongoingCreates.get(stored);
+ boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
+
+
+ BlockRecoveryInfo info = new BlockRecoveryInfo(
+ stored, isRecovery);
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
+ " length " + stored.getNumBytes() +
+ " genstamp " + stored.getGenerationStamp());
+ }
+
+ // paranoia! verify that the contents of the stored block
+ // matches the block file on disk.
+ validateBlockMetadata(stored);
+ return info;
}
-
- // paranoia! verify that the contents of the stored block
- // matches the block file on disk.
- validateBlockMetadata(stored);
-
- return info;
}
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sat Sep 10 00:56:41 2011
@@ -308,5 +308,5 @@ public interface FSDatasetInterface exte
*/
public boolean hasEnoughResource();
- public BlockRecoveryInfo getBlockRecoveryInfo(long blockId) throws IOException;
+ public BlockRecoveryInfo startBlockRecovery(long blockId) throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Sat Sep 10 00:56:41 2011
@@ -46,10 +46,12 @@ public interface InterDatanodeProtocol e
BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
/**
+ * Begin recovery on a block - this interrupts writers and returns the
+ * necessary metadata for recovery to begin.
* @return the BlockRecoveryInfo for a block
* @return null if the block is not found
*/
- BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException;
+ BlockRecoveryInfo startBlockRecovery(Block block) throws IOException;
/**
* Update the block to the new generation stamp and length.
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/AppendTestUtil.java Sat Sep 10 00:56:41 2011
@@ -21,15 +21,20 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.security.UserGroupInformation;
/** Utilities for append-related tests */
@@ -114,4 +119,85 @@ class AppendTestUtil {
throw new IOException("p=" + p + ", length=" + length + ", i=" + i, ioe);
}
}
+
+ static class WriterThread extends Thread {
+ private final FSDataOutputStream stm;
+ private final AtomicReference<Throwable> thrown;
+ private final int numWrites;
+ private final CountDownLatch countdown;
+ private final byte[] toWrite;
+ private AtomicInteger numWritten = new AtomicInteger();
+
+ public WriterThread(FSDataOutputStream stm,
+ byte[] toWrite,
+ AtomicReference<Throwable> thrown,
+ CountDownLatch countdown, int numWrites) {
+ this.toWrite = toWrite;
+ this.stm = stm;
+ this.thrown = thrown;
+ this.numWrites = numWrites;
+ this.countdown = countdown;
+ }
+
+ public void run() {
+ try {
+ countdown.await();
+ for (int i = 0; i < numWrites && thrown.get() == null; i++) {
+ doAWrite();
+ numWritten.getAndIncrement();
+ }
+ } catch (Throwable t) {
+ thrown.compareAndSet(null, t);
+ }
+ }
+
+ private void doAWrite() throws IOException {
+ stm.write(toWrite);
+ stm.sync();
+ }
+
+ public int getNumWritten() {
+ return numWritten.get();
+ }
+ }
+
+ public static void loseLeases(FileSystem whichfs) throws Exception {
+ LOG.info("leasechecker.interruptAndJoin()");
+ // lose the lease on the client
+ DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
+ dfs.dfs.leasechecker.interruptAndJoin();
+ }
+
+ public static void recoverFile(MiniDFSCluster cluster, FileSystem fs,
+ Path file1) throws IOException {
+
+ // set the soft limit to be 1 second so that the
+ // namenode triggers lease recovery upon append request
+ cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
+
+ // Trying recovery
+ int tries = 60;
+ boolean recovered = false;
+ FSDataOutputStream out = null;
+ while (!recovered && tries-- > 0) {
+ try {
+ out = fs.append(file1);
+ LOG.info("Successfully opened for appends");
+ recovered = true;
+ } catch (IOException e) {
+ LOG.info("Failed open for append, waiting on lease recovery");
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException ex) {
+ // ignore it and try again
+ }
+ }
+ }
+ if (out != null) {
+ out.close();
+ }
+ if (!recovered) {
+ throw new RuntimeException("Recovery failed");
+ }
+ }
}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Sat Sep 10 00:56:41 2011
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSImageAdapter;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
+import static org.apache.hadoop.hdfs.AppendTestUtil.loseLeases;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -146,13 +147,6 @@ public class TestFileAppend4 extends Tes
assertTrue(file1 + " should be replicated to " + rep + " datanodes, not " +
actualRepl + ".", actualRepl == rep);
}
-
- private void loseLeases(FileSystem whichfs) throws Exception {
- LOG.info("leasechecker.interruptAndJoin()");
- // lose the lease on the client
- DistributedFileSystem dfs = (DistributedFileSystem)whichfs;
- dfs.dfs.leasechecker.interruptAndJoin();
- }
/*
* Recover file.
@@ -166,42 +160,7 @@ public class TestFileAppend4 extends Tes
*/
private void recoverFile(final FileSystem fs) throws Exception {
LOG.info("Recovering File Lease");
-
- // set the soft limit to be 1 second so that the
- // namenode triggers lease recovery upon append request
- cluster.setLeasePeriod(1000, FSConstants.LEASE_HARDLIMIT_PERIOD);
-
- // Trying recovery
- int tries = 60;
- boolean recovered = false;
- FSDataOutputStream out = null;
- while (!recovered && tries-- > 0) {
- try {
- out = fs.append(file1);
- LOG.info("Successfully opened for appends");
- recovered = true;
- } catch (IOException e) {
- LOG.info("Failed open for append, waiting on lease recovery");
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ex) {
- // ignore it and try again
- }
- }
- }
- if (out != null) {
- try {
- out.close();
- LOG.info("Successfully obtained lease");
- } catch (IOException e) {
- LOG.info("Unable to close file after opening for appends. " + e);
- recovered = false;
- }
-// out.close();
- }
- if (!recovered) {
- fail((tries > 0) ? "Recovery failed" : "Recovery should take < 1 min");
- }
+ AppendTestUtil.recoverFile(cluster, fs, file1);
LOG.info("Past out lease recovery");
}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestMultiThreadedSync.java Sat Sep 10 00:56:41 2011
@@ -67,39 +67,6 @@ public class TestMultiThreadedSync {
toWrite = AppendTestUtil.randomBytes(seed, size);
}
- private class WriterThread extends Thread {
- private final FSDataOutputStream stm;
- private final AtomicReference<Throwable> thrown;
- private final int numWrites;
- private final CountDownLatch countdown;
-
- public WriterThread(FSDataOutputStream stm,
- AtomicReference<Throwable> thrown,
- CountDownLatch countdown, int numWrites) {
- this.stm = stm;
- this.thrown = thrown;
- this.numWrites = numWrites;
- this.countdown = countdown;
- }
-
- public void run() {
- try {
- countdown.await();
- for (int i = 0; i < numWrites && thrown.get() == null; i++) {
- doAWrite();
- }
- } catch (Throwable t) {
- thrown.compareAndSet(null, t);
- }
- }
-
- private void doAWrite() throws IOException {
- stm.write(toWrite);
- stm.sync();
- }
- }
-
-
@Test
public void testMultipleSyncers() throws Exception {
Configuration conf = new Configuration();
@@ -206,7 +173,7 @@ public class TestMultiThreadedSync {
ArrayList<Thread> threads = new ArrayList<Thread>();
AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
for (int i = 0; i < numThreads; i++) {
- Thread t = new WriterThread(stm, thrown, countdown, numWrites);
+ Thread t = new AppendTestUtil.WriterThread(stm, toWrite, thrown, countdown, numWrites);
threads.add(t);
t.start();
}
Added: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java?rev=1167425&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestSyncingWriterInterrupted.java Sat Sep 10 00:56:41 2011
@@ -0,0 +1,77 @@
+package org.apache.hadoop.hdfs;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.AppendTestUtil.WriterThread;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestSyncingWriterInterrupted {
+ static final Log LOG = LogFactory.getLog(TestFileAppend4.class);
+ private Configuration conf;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ conf.setBoolean("dfs.support.append", true);
+ conf.setInt("dfs.client.block.recovery.retries", 1);
+ }
+
+ @Test(timeout=90000)
+ public void testWriterInterrupted() throws Exception {
+ short repl = 3;
+ int numWrites = 20000;
+
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, repl, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+
+ Path path = new Path("/testWriterInterrupted");
+ FSDataOutputStream stm = fs1.create(path);
+ byte[] toWrite = AppendTestUtil.randomBytes(0, 5);
+
+ CountDownLatch countdown = new CountDownLatch(1);
+ AtomicReference<Throwable> thrown = new AtomicReference<Throwable>();
+ WriterThread writerThread = new AppendTestUtil.WriterThread(
+ stm, toWrite, thrown, countdown, numWrites);
+ writerThread.start();
+ countdown.countDown();
+ while (writerThread.getNumWritten() == 0 &&
+ thrown.get() == null &&
+ writerThread.isAlive()) {
+ System.err.println("Waiting for writer to start");
+ Thread.sleep(10);
+ }
+ assertTrue(writerThread.isAlive());
+ if (thrown.get() != null) {
+ throw new RuntimeException(thrown.get());
+ }
+
+ AppendTestUtil.loseLeases(fs1);
+ AppendTestUtil.recoverFile(cluster, fs2, path);
+
+ while (thrown.get() == null) {
+ LOG.info("Waiting for writer thread to get expected exception");
+ Thread.sleep(1000);
+ }
+ assertNotNull(thrown.get());
+
+ // Check that we can see all of the synced edits
+ int expectedEdits = writerThread.getNumWritten();
+ int gotEdits = (int)(fs2.getFileStatus(path).getLen() / toWrite.length);
+ assertTrue("Expected at least " + expectedEdits +
+ " edits, got " + gotEdits, gotEdits >= expectedEdits);
+
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1167425&r1=1167424&r2=1167425&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Sep 10 00:56:41 2011
@@ -690,7 +690,7 @@ public class SimulatedFSDataset impleme
}
@Override
- public BlockRecoveryInfo getBlockRecoveryInfo(long blockId)
+ public BlockRecoveryInfo startBlockRecovery(long blockId)
throws IOException {
Block stored = getStoredBlock(blockId);
return new BlockRecoveryInfo(stored, false);