You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sh...@apache.org on 2009/09/27 00:02:11 UTC
svn commit: r819226 - in /hadoop/hdfs/branches/HDFS-265: CHANGES.txt
src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Author: shv
Date: Sat Sep 26 22:02:10 2009
New Revision: 819226
URL: http://svn.apache.org/viewvc?rev=819226&view=rev
Log:
HDFS-627. Support replica update in datanode. Contributed by Tsz Wo (Nicholas), SZE and Hairong Kuang.
Modified:
hadoop/hdfs/branches/HDFS-265/CHANGES.txt
hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
Modified: hadoop/hdfs/branches/HDFS-265/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/CHANGES.txt?rev=819226&r1=819225&r2=819226&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/CHANGES.txt (original)
+++ hadoop/hdfs/branches/HDFS-265/CHANGES.txt Sat Sep 26 22:02:10 2009
@@ -40,6 +40,9 @@
HDFS-624. Support a new algorithm for pipeline recovery and pipeline setup
for append. (hairong)
+ HDFS-627. Support replica update in data-node.
+ (Tsz Wo (Nicholas), SZE and Hairong Kuang via shv)
+
IMPROVEMENTS
HDFS-509. Redesign DataNode volumeMap to include all types of Replicas.
Modified: hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=819226&r1=819225&r2=819226&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Sep 26 22:02:10 2009
@@ -21,6 +21,7 @@
import java.io.DataInputStream;
import java.io.File;
import java.io.FileInputStream;
+import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.FilenameFilter;
import java.io.IOException;
@@ -383,14 +384,6 @@
}
/**
- * Returns the name of the temporary file for this block.
- */
- File getTmpFile(Block b) throws IOException {
- File f = new File(tmpDir, b.getBlockName());
- return f;
- }
-
- /**
* Files used for copy-on-write. They need recovery when datanode
* restarts.
*/
@@ -1036,6 +1029,11 @@
static private void truncateBlock(File blockFile, File metaFile,
long oldlen, long newlen) throws IOException {
+ DataNode.LOG.info("truncateBlock: blockFile=" + blockFile
+ + ", metaFile=" + metaFile
+ + ", oldlen=" + oldlen
+ + ", newlen=" + newlen);
+
if (newlen == oldlen) {
return;
}
@@ -1419,11 +1417,17 @@
// been opened for append but never modified
return;
}
- ReplicaInfo newReplicaInfo = null;
+ finalizeReplica(replicaInfo);
+ }
+
+ private synchronized FinalizedReplica finalizeReplica(ReplicaInfo replicaInfo)
+ throws IOException {
+ FinalizedReplica newReplicaInfo = null;
if (replicaInfo.getState() == ReplicaState.RUR &&
((ReplicaUnderRecovery)replicaInfo).getOrignalReplicaState() ==
ReplicaState.FINALIZED) {
- newReplicaInfo = ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
+ newReplicaInfo = (FinalizedReplica)
+ ((ReplicaUnderRecovery)replicaInfo).getOriginalReplica();
} else {
FSVolume v = replicaInfo.getVolume();
File f = replicaInfo.getBlockFile();
@@ -1436,6 +1440,7 @@
newReplicaInfo = new FinalizedReplica(replicaInfo, v, dest.getParentFile());
}
volumeMap.add(newReplicaInfo);
+ return newReplicaInfo;
}
/**
@@ -1578,46 +1583,25 @@
/** {@inheritDoc} */
public void validateBlockMetadata(Block b) throws IOException {
- ReplicaInfo info = getReplicaInfo(b);
- FSVolume v = info.getVolume();
- File tmp = v.getTmpFile(b);
- File f = getFile(b);
- if (f == null) {
- f = tmp;
+ checkReplicaFiles(getReplicaInfo(b));
+ }
+
+ /** Check the files of a replica. */
+ static void checkReplicaFiles(final ReplicaInfo r) throws IOException {
+ final File f = r.getBlockFile();
+ if (!f.exists()) {
+ throw new FileNotFoundException("File " + f + " not found, r=" + r);
}
- if (f == null) {
- throw new IOException("Block " + b + " does not exist on disk.");
+ if (r.getNumBytes() != f.length()) {
+ throw new IOException("File length mismatched."
+ + f + " length is " + f.length() + " but r=" + r);
+ }
+ final File metafile = getMetaFile(f, r);
+ if (!metafile.exists()) {
+ throw new IOException("Metafile " + metafile + " does not exist, r=" + r);
}
- if (!f.exists()) {
- throw new IOException("Block " + b +
- " block file " + f +
- " does not exist on disk.");
- }
- if (b.getNumBytes() != f.length()) {
- throw new IOException("Block " + b +
- " length is " + b.getNumBytes() +
- " does not match block file length " +
- f.length());
- }
- File meta = getMetaFile(f, b);
- if (meta == null) {
- throw new IOException("Block " + b +
- " metafile does not exist.");
- }
- if (!meta.exists()) {
- throw new IOException("Block " + b +
- " metafile " + meta +
- " does not exist on disk.");
- }
- if (meta.length() == 0) {
- throw new IOException("Block " + b + " metafile " + meta + " is empty.");
- }
- long stamp = parseGenerationStamp(f, meta);
- if (stamp != b.getGenerationStamp()) {
- throw new IOException("Block " + b +
- " genstamp is " + b.getGenerationStamp() +
- " does not match meta file stamp " +
- stamp);
+ if (metafile.length() == 0) {
+ throw new IOException("Metafile " + metafile + " is empty, r=" + r);
}
}
@@ -2045,4 +2029,70 @@
}
return rur.createInfo();
}
+
+ /** Update a replica of a block. */
+ synchronized void updateReplica(final Block block, final long recoveryId,
+ final long newlength) throws IOException {
+ //get replica
+ final ReplicaInfo replica = volumeMap.get(block.getBlockId());
+ DataNode.LOG.info("updateReplica: block=" + block
+ + ", recoveryId=" + recoveryId
+ + ", length=" + newlength
+ + ", replica=" + replica);
+
+ //check replica
+ if (replica == null) {
+ throw new ReplicaNotFoundException(block);
+ }
+
+ //check replica state
+ if (replica.getState() != ReplicaState.RUR) {
+ throw new IOException("replica.getState() != " + ReplicaState.RUR
+ + ", replica=" + replica);
+ }
+
+ //check replica files before update
+ checkReplicaFiles(replica);
+
+ //update replica
+ final FinalizedReplica finalized = updateReplicaUnderRecovery(
+ (ReplicaUnderRecovery)replica, recoveryId, newlength);
+
+ //check replica files after update
+ checkReplicaFiles(finalized);
+ }
+
+ /** Update a ReplicaUnderRecovery to a FinalizedReplica. */
+ FinalizedReplica updateReplicaUnderRecovery(
+ final ReplicaUnderRecovery rur, final long recoveryId,
+ final long newlength) throws IOException {
+ DataNode.LOG.info("updateReplicaUnderRecovery: recoveryId=" + recoveryId
+ + ", newlength=" + newlength
+ + ", rur=" + rur);
+
+ //check recovery id
+ if (rur.getRecoveryID() != recoveryId) {
+ throw new IOException("rur.getRecoveryID() != recoveryId = " + recoveryId
+ + ", rur=" + rur);
+ }
+
+ // bump rur's GS to be recovery id
+ bumpReplicaGS(rur, recoveryId);
+
+ //update length
+ final File replicafile = rur.getBlockFile();
+ if (rur.getNumBytes() < newlength) {
+ throw new IOException("rur.getNumBytes() < newlength = " + newlength
+ + ", rur=" + rur);
+ }
+ if (rur.getNumBytes() > newlength) {
+ rur.detachBlock(1);
+ truncateBlock(replicafile, rur.getMetaFile(), rur.getNumBytes(), newlength);
+ // update RUR with the new length
+ rur.setNumBytes(newlength);
+ }
+
+ // finalize the block
+ return finalizeReplica(rur);
+ }
}
Modified: hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java?rev=819226&r1=819225&r2=819226&view=diff
==============================================================================
--- hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java (original)
+++ hadoop/hdfs/branches/HDFS-265/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestInterDatanodeProtocol.java Sat Sep 26 22:02:10 2009
@@ -196,4 +196,61 @@
}
}
+
+ /** Test {@link FSDataset#updateReplicaUnderRecovery(ReplicaUnderRecovery, long, long)} */
+ @Test
+ public void testUpdateReplicaUnderRecovery() throws IOException {
+ final Configuration conf = new Configuration();
+ MiniDFSCluster cluster = null;
+
+ try {
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ cluster.waitActive();
+
+ //create a file
+ DistributedFileSystem dfs = (DistributedFileSystem)cluster.getFileSystem();
+ String filestr = "/foo";
+ Path filepath = new Path(filestr);
+ DFSTestUtil.createFile(dfs, filepath, 1024L, (short)3, 0L);
+
+ //get block info
+ final LocatedBlock locatedblock = getLastLocatedBlock(
+ dfs.getClient().getNamenode(), filestr);
+ final DatanodeInfo[] datanodeinfo = locatedblock.getLocations();
+ Assert.assertTrue(datanodeinfo.length > 0);
+
+ //get DataNode and FSDataset objects
+ final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
+ Assert.assertTrue(datanode != null);
+ Assert.assertTrue(datanode.data instanceof FSDataset);
+ final FSDataset fsdataset = (FSDataset)datanode.data;
+
+ //initReplicaRecovery
+ final Block b = locatedblock.getBlock();
+ final long recoveryid = b.getGenerationStamp() + 1;
+ final long newlength = b.getNumBytes() - 1;
+ FSDataset.initReplicaRecovery(fsdataset.volumeMap, b, recoveryid);
+
+ //check replica
+ final ReplicaInfo replica = fsdataset.getReplica(b.getBlockId());
+ Assert.assertTrue(replica instanceof ReplicaUnderRecovery);
+ final ReplicaUnderRecovery rur = (ReplicaUnderRecovery)replica;
+
+ //check meta data before update
+ FSDataset.checkReplicaFiles(rur);
+
+ //update
+ final FinalizedReplica finalized = fsdataset.updateReplicaUnderRecovery(
+ rur, recoveryid, newlength);
+
+ //check meta data after update
+ FSDataset.checkReplicaFiles(finalized);
+ Assert.assertEquals(b.getBlockId(), finalized.getBlockId());
+ Assert.assertEquals(recoveryid, finalized.getGenerationStamp());
+ Assert.assertEquals(newlength, finalized.getNumBytes());
+
+ } finally {
+ if (cluster != null) cluster.shutdown();
+ }
+ }
}