You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/10/28 01:04:08 UTC
svn commit: r708396 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Author: szetszwo
Date: Mon Oct 27 17:04:08 2008
New Revision: 708396
URL: http://svn.apache.org/viewvc?rev=708396&view=rev
Log:
HADOOP-4517. Release FSDataset lock before joining ongoing create threads. (szetszwo)
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=708396&r1=708395&r2=708396&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Mon Oct 27 17:04:08 2008
@@ -997,6 +997,9 @@
HADOOP-4467. SerializationFactory now uses the current context ClassLoader
allowing for user supplied Serialization instances. (Chris Wensel via
acmurthy)
+
+ HADOOP-4517. Release FSDataset lock before joining ongoing create threads.
+ (szetszwo)
Release 0.18.1 - 2008-09-17
Modified: hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=708396&r1=708395&r2=708396&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/branches/branch-0.19/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Oct 27 17:04:08 2008
@@ -551,8 +551,8 @@
static class ActiveFile {
- File file;
- List<Thread> threads = new ArrayList<Thread>(2);
+ final File file;
+ final List<Thread> threads = new ArrayList<Thread>(2);
ActiveFile(File f, List<Thread> list) {
file = f;
@@ -796,38 +796,65 @@
}
}
- /** interrupt and wait for all ongoing create threads */
- private synchronized void interruptOngoingCreates(Block b) {
- //remove ongoingCreates threads
- ActiveFile activefile = ongoingCreates.get(b);
- if (activefile != null) {
- for(Thread t : activefile.threads) {
+ /** {@inheritDoc} */
+ public void updateBlock(Block oldblock, Block newblock) throws IOException {
+ if (oldblock.getBlockId() != newblock.getBlockId()) {
+ throw new IOException("Cannot update oldblock (=" + oldblock
+ + ") to newblock (=" + newblock + ").");
+ }
+
+ for(;;) {
+ final List<Thread> threads = tryUpdateBlock(oldblock, newblock);
+ if (threads == null) {
+ return;
+ }
+
+ // interrupt and wait for all ongoing create threads
+ for(Thread t : threads) {
t.interrupt();
}
- for(Thread t : activefile.threads) {
+ for(Thread t : threads) {
try {
t.join();
} catch (InterruptedException e) {
- DataNode.LOG.warn("interruptOngoingCreates: b=" + b
- + ", activeFile=" + activefile + ", t=" + t, e);
+ DataNode.LOG.warn("interruptOngoingCreates: t=" + t, e);
}
}
- activefile.threads.clear();
}
}
- /** {@inheritDoc} */
- public synchronized void updateBlock(Block oldblock, Block newblock
- ) throws IOException {
- if (oldblock.getBlockId() != newblock.getBlockId()) {
- throw new IOException("Cannot update oldblock (=" + oldblock
- + ") to newblock (=" + newblock + ").");
+
+ /**
+ * Try to update an old block to a new block.
+ * If there are ongoing create threads running for the old block,
+ * the threads will be returned without updating the block.
+ *
+ * @return ongoing create threads if there is any. Otherwise, return null.
+ */
+ private synchronized List<Thread> tryUpdateBlock(
+ Block oldblock, Block newblock) throws IOException {
+ //check ongoing create threads
+ final ActiveFile activefile = ongoingCreates.get(oldblock);
+ if (activefile != null && !activefile.threads.isEmpty()) {
+ //remove dead threads
+ for(Iterator<Thread> i = activefile.threads.iterator(); i.hasNext(); ) {
+ final Thread t = i.next();
+ if (!t.isAlive()) {
+ i.remove();
+ }
+ }
+
+ //return living threads
+ if (!activefile.threads.isEmpty()) {
+ return new ArrayList<Thread>(activefile.threads);
+ }
}
+
+ //No ongoing create threads is alive. Update block.
File blockFile = findBlockFile(oldblock.getBlockId());
if (blockFile == null) {
throw new IOException("Block " + oldblock + " does not exist.");
}
- interruptOngoingCreates(oldblock);
-
+
File oldMetaFile = findMetaFile(blockFile);
long oldgs = parseGenerationStamp(blockFile, oldMetaFile);
@@ -866,6 +893,7 @@
// paranoia! verify that the contents of the stored block
// matches the block file on disk.
validateBlockMetadata(newblock);
+ return null;
}
static private void truncateBlock(File blockFile, File metaFile,