You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/10 02:06:42 UTC
svn commit: r1167409 - in /hadoop/common/branches/branch-0.20-security: ./
src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/...
Author: suresh
Date: Sat Sep 10 00:06:41 2011
New Revision: 1167409
URL: http://svn.apache.org/viewvc?rev=1167409&view=rev
Log:
HDFS-1218. Blocks recovered on startup should be treated with lower priority during block synchronization. Contributed by Todd Lipcon.
Added:
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Sat Sep 10 00:06:41 2011
@@ -116,6 +116,9 @@ Release 0.20.205.0 - unreleased
HDFS-1197. Blocks are considered "complete" prematurely after
commitBlockSynchronization or DN restart. (Todd Lipcon via jitendra)
+ HDFS-1218. Blocks recovered on startup should be treated with lower
+ priority during block synchronization. (Todd Lipcon via suresh)
+
IMPROVEMENTS
MAPREDUCE-2187. Reporter sends progress during sort/merge. (Anupam Seth via
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Sat Sep 10 00:06:41 2011
@@ -2748,6 +2748,8 @@ public class DFSClient implements FSCons
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && clientRunning) {
+ LOG.debug("Sleeping for artificial slowdown of " +
+ artificialSlowdown + "ms");
try {
Thread.sleep(artificialSlowdown);
} catch (InterruptedException e) {}
@@ -3641,6 +3643,16 @@ public class DFSClient implements FSCons
s = null;
}
}
+
+ /**
+ * Harsh abort method that should only be used from tests - this
+ * is in order to prevent pipeline recovery when eg a DN shuts down.
+ */
+ void abortForTests() throws IOException {
+ streamer.close();
+ response.close();
+ closed = true;
+ }
// shutdown datastreamer and responseprocessor threads.
private void closeThreads() throws IOException {
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/DatanodeID.java Sat Sep 10 00:06:41 2011
@@ -153,6 +153,7 @@ public class DatanodeID implements Writa
public void updateRegInfo(DatanodeID nodeReg) {
name = nodeReg.getName();
infoPort = nodeReg.getInfoPort();
+ ipcPort = nodeReg.getIpcPort();
// update any more fields added in future.
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Sat Sep 10 00:06:41 2011
@@ -82,6 +82,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@@ -1638,6 +1639,7 @@ public class DataNode extends Configured
if (LOG.isDebugEnabled()) {
LOG.debug("block=" + block);
}
+
Block stored = data.getStoredBlock(block.getBlockId());
if (stored == null) {
@@ -1656,6 +1658,11 @@ public class DataNode extends Configured
data.validateBlockMetadata(stored);
return info;
}
+
+ @Override
+ public BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException {
+ return data.getBlockRecoveryInfo(block.getBlockId());
+ }
public Daemon recoverBlocks(final Block[] blocks, final DatanodeInfo[][] targets) {
Daemon d = new Daemon(threadGroup, new Runnable() {
@@ -1707,21 +1714,26 @@ public class DataNode extends Configured
private static class BlockRecord {
final DatanodeID id;
final InterDatanodeProtocol datanode;
- final Block block;
+ final BlockRecoveryInfo info;
- BlockRecord(DatanodeID id, InterDatanodeProtocol datanode, Block block) {
+ BlockRecord(DatanodeID id, InterDatanodeProtocol datanode,
+ BlockRecoveryInfo info) {
this.id = id;
this.datanode = datanode;
- this.block = block;
+ this.info = info;
}
/** {@inheritDoc} */
public String toString() {
- return "block:" + block + " node:" + id;
+ return "BlockRecord(info=" + info + " node=" + id + ")";
}
}
- /** Recover a block */
+ /** Recover a block
+ * @param keepLength if true, will only recover replicas that have the same length
+ * as the block passed in. Otherwise, will calculate the minimum length of the
+ * replicas and truncate the rest to that length.
+ **/
private LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeInfo[] targets, boolean closeFile) throws IOException {
@@ -1741,28 +1753,40 @@ public class DataNode extends Configured
ongoingRecovery.put(block, block);
}
try {
- List<BlockRecord> syncList = new ArrayList<BlockRecord>();
- long minlength = Long.MAX_VALUE;
int errorCount = 0;
- //check generation stamps
+ // Number of "replicasBeingWritten" in 0.21 parlance - these are replicas
+ // on DNs that are still alive from when the write was happening
+ int rbwCount = 0;
+ // Number of "replicasWaitingRecovery" in 0.21 parlance - these replicas
+ // have survived a DN restart, and thus might be truncated (eg if the
+ // DN died because of a machine power failure, and when the ext3 journal
+ // replayed, it truncated the file
+ int rwrCount = 0;
+
+ List<BlockRecord> blockRecords = new ArrayList<BlockRecord>();
for(DatanodeID id : datanodeids) {
try {
InterDatanodeProtocol datanode = dnRegistration.equals(id)?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), socketTimeout);
- BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
- if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
- if (keepLength) {
- if (info.getNumBytes() == block.getNumBytes()) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- }
- }
- else {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.getNumBytes() < minlength) {
- minlength = info.getNumBytes();
- }
- }
+ BlockRecoveryInfo info = datanode.getBlockRecoveryInfo(block);
+ if (info == null) {
+ LOG.info("No block metadata found for block " + block + " on datanode "
+ + id);
+ continue;
+ }
+ if (info.getBlock().getGenerationStamp() < block.getGenerationStamp()) {
+ LOG.info("Only old generation stamp " + info.getBlock().getGenerationStamp()
+ + " found on datanode " + id + " (needed block=" +
+ block + ")");
+ continue;
+ }
+ blockRecords.add(new BlockRecord(id, datanode, info));
+
+ if (info.wasRecoveredOnStartup()) {
+ rwrCount++;
+ } else {
+ rbwCount++;
}
} catch (IOException e) {
++errorCount;
@@ -1772,6 +1796,34 @@ public class DataNode extends Configured
}
}
+ // If we *only* have replicas from post-DN-restart, then we should
+ // include them in determining length. Otherwise they might cause us
+ // to truncate too short.
+ boolean shouldRecoverRwrs = (rbwCount == 0);
+
+ List<BlockRecord> syncList = new ArrayList<BlockRecord>();
+ long minlength = Long.MAX_VALUE;
+
+ for (BlockRecord record : blockRecords) {
+ BlockRecoveryInfo info = record.info;
+ assert (info != null && info.getBlock().getGenerationStamp() >= block.getGenerationStamp());
+ if (!shouldRecoverRwrs && info.wasRecoveredOnStartup()) {
+ LOG.info("Not recovering replica " + record + " since it was recovered on "
+ + "startup and we have better replicas");
+ continue;
+ }
+ if (keepLength) {
+ if (info.getBlock().getNumBytes() == block.getNumBytes()) {
+ syncList.add(record);
+ }
+ } else {
+ syncList.add(record);
+ if (info.getBlock().getNumBytes() < minlength) {
+ minlength = info.getBlock().getNumBytes();
+ }
+ }
+ }
+
if (syncList.isEmpty() && errorCount > 0) {
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(datanodeids));
@@ -1816,7 +1868,7 @@ public class DataNode extends Configured
for(BlockRecord r : syncList) {
try {
- r.datanode.updateBlock(r.block, newblock, closeFile);
+ r.datanode.updateBlock(r.info.getBlock(), newblock, closeFile);
successList.add(r.id);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Sat Sep 10 00:06:41 2011
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
@@ -573,7 +574,7 @@ public class FSDataset implements FSCons
for (BlockAndFile b : blockSet) {
File f = b.pathfile; // full path name of block file
volumeMap.put(b.block, new DatanodeBlockInfo(this, f));
- ongoingCreates.put(b.block, new ActiveFile(f));
+ ongoingCreates.put(b.block, ActiveFile.createStartupRecoveryFile(f));
if (DataNode.LOG.isDebugEnabled()) {
DataNode.LOG.debug("recoverBlocksBeingWritten for block " + b.block);
}
@@ -767,19 +768,34 @@ public class FSDataset implements FSCons
final File file;
final List<Thread> threads = new ArrayList<Thread>(2);
private volatile long visibleLength;
-
+ /**
+ * Set to true if this file was recovered during datanode startup.
+ * This may indicate that the file has been truncated (eg during
+ * underlying filesystem journal replay)
+ */
+ final boolean wasRecoveredOnStartup;
+
ActiveFile(File f, List<Thread> list) {
- this(f);
+ this(f, false);
if (list != null) {
threads.addAll(list);
}
threads.add(Thread.currentThread());
}
- // no active threads associated with this ActiveFile
- ActiveFile(File f) {
+ /**
+ * Create an ActiveFile from a file on disk during DataNode startup.
+ * This factory method is just to make it clear when the purpose
+ * of this constructor is.
+ */
+ public static ActiveFile createStartupRecoveryFile(File f) {
+ return new ActiveFile(f, true);
+ }
+
+ private ActiveFile(File f, boolean recovery) {
file = f;
visibleLength = f.length();
+ wasRecoveredOnStartup = recovery;
}
public long getVisibleLength() {
@@ -809,7 +825,7 @@ public class FSDataset implements FSCons
}
/** Find the corresponding meta data file from a given block file */
- static File findMetaFile(final File blockFile) throws IOException {
+ public static File findMetaFile(final File blockFile) throws IOException {
final String prefix = blockFile.getName() + "_";
final File parent = blockFile.getParentFile();
File[] matches = parent.listFiles(new FilenameFilter() {
@@ -1216,12 +1232,31 @@ public class FSDataset implements FSCons
+ ") to newlen (=" + newlen + ")");
}
+ if (newlen == 0) {
+ // Special case for truncating to 0 length, since there's no previous
+ // chunk.
+ RandomAccessFile blockRAF = new RandomAccessFile(blockFile, "rw");
+ try {
+ //truncate blockFile
+ blockRAF.setLength(newlen);
+ } finally {
+ blockRAF.close();
+ }
+ //update metaFile
+ RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
+ try {
+ metaRAF.setLength(BlockMetadataHeader.getHeaderSize());
+ } finally {
+ metaRAF.close();
+ }
+ return;
+ }
DataChecksum dcs = BlockMetadataHeader.readHeader(metaFile).getChecksum();
int checksumsize = dcs.getChecksumSize();
int bpc = dcs.getBytesPerChecksum();
- long n = (newlen - 1)/bpc + 1;
- long newmetalen = BlockMetadataHeader.getHeaderSize() + n*checksumsize;
- long lastchunkoffset = (n - 1)*bpc;
+ long newChunkCount = (newlen - 1)/bpc + 1;
+ long newmetalen = BlockMetadataHeader.getHeaderSize() + newChunkCount*checksumsize;
+ long lastchunkoffset = (newChunkCount - 1)*bpc;
int lastchunksize = (int)(newlen - lastchunkoffset);
byte[] b = new byte[Math.max(lastchunksize, checksumsize)];
@@ -1908,4 +1943,32 @@ public class FSDataset implements FSCons
return info;
}
}
+
+ @Override
+ public synchronized BlockRecoveryInfo getBlockRecoveryInfo(long blockId)
+ throws IOException {
+ Block stored = getStoredBlock(blockId);
+
+ if (stored == null) {
+ return null;
+ }
+
+ ActiveFile activeFile = ongoingCreates.get(stored);
+ boolean isRecovery = (activeFile != null) && activeFile.wasRecoveredOnStartup;
+
+
+ BlockRecoveryInfo info = new BlockRecoveryInfo(
+ stored, isRecovery);
+ if (DataNode.LOG.isDebugEnabled()) {
+ DataNode.LOG.debug("getBlockMetaDataInfo successful block=" + stored +
+ " length " + stored.getNumBytes() +
+ " genstamp " + stored.getGenerationStamp());
+ }
+
+ // paranoia! verify that the contents of the stored block
+ // matches the block file on disk.
+ validateBlockMetadata(stored);
+
+ return info;
+ }
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Sat Sep 10 00:06:41 2011
@@ -28,6 +28,7 @@ import java.io.OutputStream;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -306,4 +307,6 @@ public interface FSDatasetInterface exte
* @return true if more then minimum valid volumes left in the FSDataSet
*/
public boolean hasEnoughResource();
+
+ public BlockRecoveryInfo getBlockRecoveryInfo(long blockId) throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Sat Sep 10 00:06:41 2011
@@ -2113,6 +2113,8 @@ public class FSNamesystem implements FSC
* @param lease The lease for the client creating the file
*/
void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
+ assert Thread.holdsLock(this);
+
LOG.info("Recovering lease=" + lease + ", src=" + src);
INodeFile iFile = dir.getFileINode(src);
Added: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java?rev=1167409&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/BlockRecoveryInfo.java Sat Sep 10 00:06:41 2011
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.protocol;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.io.Writable;
+
+public class BlockRecoveryInfo implements Writable {
+ private Block block;
+ private boolean wasRecoveredOnStartup;
+
+ public BlockRecoveryInfo() {
+ block = new Block();
+ wasRecoveredOnStartup = false;
+ }
+
+ public BlockRecoveryInfo(Block block,
+ boolean wasRecoveredOnStartup)
+ {
+ this.block = new Block(block);
+ this.wasRecoveredOnStartup = wasRecoveredOnStartup;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ block.readFields(in);
+ wasRecoveredOnStartup = in.readBoolean();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ block.write(out);
+ out.writeBoolean(wasRecoveredOnStartup);
+ }
+
+ public Block getBlock() {
+ return block;
+ }
+ public boolean wasRecoveredOnStartup() {
+ return wasRecoveredOnStartup;
+ }
+
+ public String toString() {
+ return "BlockRecoveryInfo(block=" + block +
+ " wasRecoveredOnStartup=" + wasRecoveredOnStartup + ")";
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/protocol/InterDatanodeProtocol.java Sat Sep 10 00:06:41 2011
@@ -46,6 +46,12 @@ public interface InterDatanodeProtocol e
BlockMetaDataInfo getBlockMetaDataInfo(Block block) throws IOException;
/**
+ * @return the BlockRecoveryInfo for a block
+ * @return null if the block is not found
+ */
+ BlockRecoveryInfo getBlockRecoveryInfo(Block block) throws IOException;
+
+ /**
* Update the block to the new generation stamp and length.
*/
void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException;
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestFileAppend4.java Sat Sep 10 00:06:41 2011
@@ -35,16 +35,16 @@ import org.apache.commons.logging.LogFac
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSClient.DFSOutputStream;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.FSConstants.SafeModeAction;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
+import org.apache.hadoop.hdfs.server.datanode.FSDatasetTestUtil;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
@@ -55,7 +55,6 @@ import org.apache.hadoop.fs.FSDataOutput
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.log4j.Level;
-import org.mockito.Matchers;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@@ -227,11 +226,9 @@ public class TestFileAppend4 extends Tes
int actual = b.getNames().length;
if ( actual < expected ) {
- if (true || iters > 0) {
- LOG.info("Not enough replicas for " + b +
- " yet. Expecting " + expected + ", got " +
- actual + ".");
- }
+ LOG.info("Not enough replicas for " + b +
+ " yet. Expecting " + expected + ", got " +
+ actual + ".");
replOk = false;
break;
}
@@ -260,8 +257,20 @@ public class TestFileAppend4 extends Tes
AppendTestUtil.check(whichfs, file1, fileSize);
}
- private void corruptDatanode(int dnNumber) throws Exception {
- // get the FS data of the 2nd datanode
+ enum CorruptionType {
+ CORRUPT_LAST_CHUNK,
+ TRUNCATE_BLOCK_TO_ZERO,
+ TRUNCATE_BLOCK_HALF;
+ }
+
+ /**
+ * Corrupt all of the blocks in the blocksBeingWritten dir
+ * for the specified datanode number. The corruption is
+ * specifically the last checksum chunk of the file being
+ * modified by writing random data into it.
+ */
+ private void corruptDataNode(int dnNumber, CorruptionType type) throws Exception {
+ // get the FS data of the specified datanode
File data_dir = new File(System.getProperty("test.build.data"),
"dfs/data/data" +
Integer.toString(dnNumber*2 + 1) +
@@ -271,21 +280,38 @@ public class TestFileAppend4 extends Tes
// only touch the actual data, not the metadata (with CRC)
if (block.getName().startsWith("blk_") &&
!block.getName().endsWith("meta")) {
- RandomAccessFile file = new RandomAccessFile(block, "rw");
- FileChannel channel = file.getChannel();
-
- Random r = new Random();
- long lastBlockSize = channel.size() % 512;
- long position = channel.size() - lastBlockSize;
- int length = r.nextInt((int)(channel.size() - position + 1));
- byte[] buffer = new byte[length];
- r.nextBytes(buffer);
-
- channel.write(ByteBuffer.wrap(buffer), position);
- System.out.println("Deliberately corrupting file " + block.getName() +
- " at offset " + position +
- " length " + length);
- file.close();
+ if (type == CorruptionType.CORRUPT_LAST_CHUNK) {
+ RandomAccessFile file = new RandomAccessFile(block, "rw");
+ FileChannel channel = file.getChannel();
+ Random r = new Random();
+ long lastBlockSize = channel.size() % 512;
+ long position = channel.size() - lastBlockSize;
+ int length = r.nextInt((int)(channel.size() - position + 1));
+ byte[] buffer = new byte[length];
+ r.nextBytes(buffer);
+
+
+ channel.write(ByteBuffer.wrap(buffer), position);
+ System.out.println("Deliberately corrupting file " + block.getName() +
+ " at offset " + position +
+ " length " + length);
+ file.close();
+
+ } else if (type == CorruptionType.TRUNCATE_BLOCK_TO_ZERO) {
+ LOG.info("Truncating block file at " + block);
+ RandomAccessFile blockFile = new RandomAccessFile(block, "rw");
+ blockFile.setLength(0);
+ blockFile.close();
+
+ RandomAccessFile metaFile = new RandomAccessFile(
+ FSDataset.findMetaFile(block), "rw");
+ metaFile.setLength(0);
+ metaFile.close();
+ } else if (type == CorruptionType.TRUNCATE_BLOCK_HALF) {
+ FSDatasetTestUtil.truncateBlockFile(block, block.length() / 2);
+ } else {
+ assert false;
+ }
++corrupted;
}
}
@@ -554,7 +580,7 @@ public class TestFileAppend4 extends Tes
LOG.info("STOPPED first instance of the cluster");
// give the second datanode a bad CRC
- corruptDatanode(corruptDN);
+ corruptDataNode(corruptDN, CorruptionType.CORRUPT_LAST_CHUNK);
// restart the cluster
cluster = new MiniDFSCluster(conf, 3, false, null);
@@ -1052,6 +1078,115 @@ public class TestFileAppend4 extends Tes
}
/**
+ * Test for what happens when the machine doing the write totally
+ * loses power, and thus when it restarts, the local replica has been
+ * truncated to 0 bytes (very common with journaling filesystems)
+ */
+ public void testTruncatedPrimaryDN() throws Exception {
+ LOG.info("START");
+ runDNRestartCorruptType(CorruptionType.TRUNCATE_BLOCK_TO_ZERO);
+ }
+
+ /**
+ * Test for what happens when the machine doing the write loses power
+ * but a previous length of the block being written had made it to the
+ * journal
+ */
+ public void testHalfLengthPrimaryDN() throws Exception {
+ LOG.info("START");
+ runDNRestartCorruptType(CorruptionType.TRUNCATE_BLOCK_HALF);
+ }
+
+ private void runDNRestartCorruptType(CorruptionType corrupt) throws Exception {
+ cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ try {
+ short rep = 3; // replication
+ assertTrue(BLOCK_SIZE%4 == 0);
+
+ file1 = new Path("/dnDeath.dat");
+
+ // write 1/2 block & close
+ stm = fs1.create(file1, true, 1024, rep, 4096);
+ AppendTestUtil.write(stm, 0, 1024);
+ stm.sync();
+ loseLeases(fs1);
+
+ DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream();
+ dfso.abortForTests();
+
+ // close the primary DN
+ DataNodeProperties badDN = cluster.stopDataNode(0);
+
+ // Truncate the block on the primary DN
+ corruptDataNode(0, corrupt);
+
+ // Start the DN back up
+ cluster.restartDataNode(badDN);
+
+ // Recover the lease
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+ recoverFile(fs2);
+
+ assertFileSize(fs2, 1024);
+ checkFile(fs2, 1024);
+ } finally {
+ // explicitly do not shut down fs1, since it's been frozen up by
+ // killing the DataStreamer and not allowing recovery
+ cluster.shutdown();
+ }
+ }
+
+ public void testFullClusterPowerLoss() throws Exception {
+ cluster = new MiniDFSCluster(conf, 2, true, null);
+ FileSystem fs1 = cluster.getFileSystem();
+ try {
+ short rep = 2; // replication
+ assertTrue(BLOCK_SIZE%4 == 0);
+
+ file1 = new Path("/dnDeath.dat");
+
+ // write 1/2 block & close
+ stm = fs1.create(file1, true, 1024, rep, 4096);
+ AppendTestUtil.write(stm, 0, 1024);
+ stm.sync();
+ loseLeases(fs1);
+
+ DFSOutputStream dfso = (DFSOutputStream)stm.getWrappedStream();
+ dfso.abortForTests();
+
+ // close the DNs
+ DataNodeProperties badDN = cluster.stopDataNode(0);
+ DataNodeProperties badDN2 = cluster.stopDataNode(0); // what was 1 is now 0
+ assertNotNull(badDN);
+ assertNotNull(badDN2);
+
+ // Truncate one of them as if its journal got corrupted
+ corruptDataNode(0, CorruptionType.TRUNCATE_BLOCK_HALF);
+
+ // Start the DN back up
+ cluster.restartDataNode(badDN);
+ cluster.restartDataNode(badDN2);
+
+ // Wait for a heartbeat to make sure we get the initial block
+ // report of the replicasBeingWritten
+ cluster.waitForDNHeartbeat(0, 10000);
+ cluster.waitForDNHeartbeat(1, 10000);
+
+ // Recover the lease
+ FileSystem fs2 = AppendTestUtil.createHdfsWithDifferentUsername(fs1.getConf());
+ recoverFile(fs2);
+
+ assertFileSize(fs2, 512);
+ checkFile(fs2, 512);
+ } finally {
+ // explicitly do not shut down fs1, since it's been frozen up by
+ // killing the DataStreamer and not allowing recovery
+ cluster.shutdown();
+ }
+ }
+
+ /**
* Mockito answer helper that triggers one latch as soon as the
* method is called, then waits on another before continuing.
*/
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestLeaseRecovery.java Sat Sep 10 00:06:41 2011
@@ -51,12 +51,22 @@ public class TestLeaseRecovery extends j
return m;
}
+ public void testBlockSynchronization() throws Exception {
+ runTestBlockSynchronization(false);
+ }
+ public void testBlockSynchronizationWithZeroBlock() throws Exception {
+ runTestBlockSynchronization(true);
+ }
+
+
/**
* The following test first creates a file with a few blocks.
* It randomly truncates the replica of the last block stored in each datanode.
* Finally, it triggers block synchronization to synchronize all stored block.
+ * @param forceOneBlockToZero if true, will truncate one block to 0 length
*/
- public void testBlockSynchronization() throws Exception {
+ public void runTestBlockSynchronization(boolean forceOneBlockToZero)
+ throws Exception {
final int ORG_FILE_SIZE = 3000;
Configuration conf = new Configuration();
conf.setLong("dfs.block.size", BLOCK_SIZE);
@@ -101,6 +111,9 @@ public class TestLeaseRecovery extends j
for(int i = 0; i < REPLICATION_NUM; i++) {
newblocksizes[i] = AppendTestUtil.nextInt(lastblocksize);
}
+ if (forceOneBlockToZero) {
+ newblocksizes[0] = 0;
+ }
DataNode.LOG.info("newblocksizes = " + Arrays.asList(newblocksizes));
//update blocks with random block sizes
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/FSDatasetTestUtil.java Sat Sep 10 00:06:41 2011
@@ -39,9 +39,16 @@ public abstract class FSDatasetTestUtil
throw new IOException("Can't find block file for block " +
block + " on DN " + dn);
}
- File metaFile = ds.findMetaFile(blockFile);
+ File metaFile = FSDataset.findMetaFile(blockFile);
FSDataset.truncateBlock(blockFile, metaFile,
block.getNumBytes(), newLength);
}
+
+ public static void truncateBlockFile(File blockFile, long newLength)
+ throws IOException {
+ File metaFile = FSDataset.findMetaFile(blockFile);
+ FSDataset.truncateBlock(blockFile, metaFile,
+ blockFile.length(), newLength);
+ }
}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1167409&r1=1167408&r2=1167409&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Sat Sep 10 00:06:41 2011
@@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryInfo;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -687,4 +688,11 @@ public class SimulatedFSDataset impleme
public Block[] getBlocksBeingWrittenReport() {
return null;
}
+
+ @Override
+ public BlockRecoveryInfo getBlockRecoveryInfo(long blockId)
+ throws IOException {
+ Block stored = getStoredBlock(blockId);
+ return new BlockRecoveryInfo(stored, false);
+ }
}