You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2008/10/17 22:30:44 UTC
svn commit: r705744 - in /hadoop/core/trunk: ./
src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/
src/test/org/apache/hadoop/hdfs/
Author: szetszwo
Date: Fri Oct 17 13:30:43 2008
New Revision: 705744
URL: http://svn.apache.org/viewvc?rev=705744&view=rev
Log:
HADOOP-4423. Keep block length when the block recovery is triggered by append. (szetszwo)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Oct 17 13:30:43 2008
@@ -937,9 +937,6 @@
HADOOP-4426. TestCapacityScheduler broke due to the two commits HADOOP-4053
and HADOOP-4373. This patch fixes that. (Hemanth Yamijala via ddas)
- HADOOP-4278. Increase debug logging for unit test TestDatanodeDeath.
- (dhruba)
-
HADOOP-4418. Updates documentation in forrest for Mapred, streaming and pipes.
(Amareshwari Sriramadasu via ddas)
@@ -953,8 +950,11 @@
HADOOP-4427. Adds the new queue/job commands to the manual.
(Sreekanth Ramakrishnan via ddas)
- HADOOP-4278. If the primary datanode fails in DFSClent, remove it from
- the pipe line. (dhruba via szetszwo)
+ HADOOP-4278. Increase debug logging for unit test TestDatanodeDeath.
+ Fix the case when primary is dead. (dhruba via szetszwo)
+
+ HADOOP-4423. Keep block length when the block recovery is triggered by
+ append. (szetszwo)
Release 0.18.2 - Unreleased
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Oct 17 13:30:43 2008
@@ -2146,7 +2146,7 @@
synchronized (dataQueue) {
// process IO errors if any
- boolean doSleep = processDatanodeError(hasError);
+ boolean doSleep = processDatanodeError(hasError, false);
// wait for a packet to be sent.
while ((!closed && !hasError && clientRunning
@@ -2371,7 +2371,7 @@
// threads and mark stream as closed. Returns true if we should
// sleep for a while after returning from this call.
//
- private boolean processDatanodeError(boolean hasError) {
+ private boolean processDatanodeError(boolean hasError, boolean isAppend) {
if (!hasError) {
return false;
}
@@ -2453,7 +2453,7 @@
// Pick the "least" datanode as the primary datanode to avoid deadlock.
primaryNode = Collections.min(Arrays.asList(newnodes));
primary = createClientDatanodeProtocolProxy(primaryNode, conf);
- newBlock = primary.recoverBlock(block, newnodes);
+ newBlock = primary.recoverBlock(block, isAppend, newnodes);
} catch (IOException e) {
recoveryErrorCount++;
if (recoveryErrorCount > maxRecoveryErrorCount) {
@@ -2640,7 +2640,7 @@
"of file " + src);
}
- processDatanodeError(true);
+ processDatanodeError(true, true);
streamer.start();
}
else {
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Oct 17 13:30:43 2008
@@ -29,17 +29,19 @@
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
/**
- * 2: recoverBlock returns the datanodes on which recovery was successful.
+ * 3: add keepLength parameter.
*/
- public static final long versionID = 2L;
+ public static final long versionID = 3L;
/** Start generation-stamp recovery for specified block
* @param block the specified block
+ * @param keepLength keep the block length
* @param targets the list of possible locations of specified block
* @return the new blockid if recovery successful and the generation stamp
* got updated as part of the recovery, else returns null if the block id
* not have any data and the block was deleted.
* @throws IOException
*/
- LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets) throws IOException;
+ LocatedBlock recoverBlock(Block block, boolean keepLength,
+ DatanodeInfo[] targets) throws IOException;
}
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Oct 17 13:30:43 2008
@@ -1323,7 +1323,7 @@
for(int i = 0; i < blocks.length; i++) {
try {
logRecoverBlock("NameNode", blocks[i], targets[i]);
- recoverBlock(blocks[i], targets[i], true);
+ recoverBlock(blocks[i], false, targets[i], true);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED, blocks[" + i + "]=" + blocks[i], e);
}
@@ -1336,8 +1336,9 @@
/** {@inheritDoc} */
public void updateBlock(Block oldblock, Block newblock, boolean finalize) throws IOException {
- LOG.info("oldblock=" + oldblock + ", newblock=" + newblock
- + ", datanode=" + dnRegistration.getName());
+ LOG.info("oldblock=" + oldblock + "(length=" + oldblock.getNumBytes()
+ + "), newblock=" + newblock + "(length=" + newblock.getNumBytes()
+ + "), datanode=" + dnRegistration.getName());
data.updateBlock(oldblock, newblock);
if (finalize) {
data.finalizeBlock(newblock);
@@ -1380,8 +1381,8 @@
}
/** Recover a block */
- private LocatedBlock recoverBlock(Block block, DatanodeID[] datanodeids,
- boolean closeFile) throws IOException {
+ private LocatedBlock recoverBlock(Block block, boolean keepLength,
+ DatanodeID[] datanodeids, boolean closeFile) throws IOException {
// If the block is already being recovered, then skip recovering it.
// This can happen if the namenode and client start recovering the same
@@ -1409,9 +1410,16 @@
this: DataNode.createInterDataNodeProtocolProxy(id, getConf());
BlockMetaDataInfo info = datanode.getBlockMetaDataInfo(block);
if (info != null && info.getGenerationStamp() >= block.getGenerationStamp()) {
- syncList.add(new BlockRecord(id, datanode, new Block(info)));
- if (info.getNumBytes() < minlength) {
- minlength = info.getNumBytes();
+ if (keepLength) {
+ if (info.getNumBytes() == block.getNumBytes()) {
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
+ }
+ }
+ else {
+ syncList.add(new BlockRecord(id, datanode, new Block(info)));
+ if (info.getNumBytes() < minlength) {
+ minlength = info.getNumBytes();
+ }
}
}
} catch (IOException e) {
@@ -1426,7 +1434,10 @@
throw new IOException("All datanodes failed: block=" + block
+ ", datanodeids=" + Arrays.asList(datanodeids));
}
- return syncBlock(block, minlength, syncList, closeFile);
+ if (!keepLength) {
+ block.setNumBytes(minlength);
+ }
+ return syncBlock(block, syncList, closeFile);
} finally {
synchronized (ongoingRecovery) {
ongoingRecovery.remove(block);
@@ -1435,11 +1446,11 @@
}
/** Block synchronization */
- private LocatedBlock syncBlock(Block block, long minlength,
- List<BlockRecord> syncList, boolean closeFile) throws IOException {
+ private LocatedBlock syncBlock(Block block, List<BlockRecord> syncList,
+ boolean closeFile) throws IOException {
if (LOG.isDebugEnabled()) {
- LOG.debug("block=" + block + ", minlength=" + minlength
- + ", syncList=" + syncList + ", closeFile=" + closeFile);
+ LOG.debug("block=" + block + ", (length=" + block.getNumBytes()
+ + "), syncList=" + syncList + ", closeFile=" + closeFile);
}
//syncList.isEmpty() that all datanodes do not have the block
@@ -1453,7 +1464,7 @@
List<DatanodeID> successList = new ArrayList<DatanodeID>();
long generationstamp = namenode.nextGenerationStamp(block);
- Block newblock = new Block(block.getBlockId(), minlength, generationstamp);
+ Block newblock = new Block(block.getBlockId(), block.getNumBytes(), generationstamp);
for(BlockRecord r : syncList) {
try {
@@ -1489,10 +1500,10 @@
// ClientDataNodeProtocol implementation
/** {@inheritDoc} */
- public LocatedBlock recoverBlock(Block block, DatanodeInfo[] targets
+ public LocatedBlock recoverBlock(Block block, boolean keepLength, DatanodeInfo[] targets
) throws IOException {
logRecoverBlock("Client", block, targets);
- return recoverBlock(block, targets, false);
+ return recoverBlock(block, keepLength, targets, false);
}
private static void logRecoverBlock(String who,
Modified: hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/core/trunk/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Oct 17 13:30:43 2008
@@ -1174,9 +1174,7 @@
f = tmp;
}
if (f == null) {
- throw new IOException("Block " + b +
- " block file " + f +
- " does not exist on disk.");
+ throw new IOException("Block " + b + " does not exist on disk.");
}
if (!f.exists()) {
throw new IOException("Block " + b +
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java?rev=705744&r1=705743&r2=705744&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/hdfs/TestFileAppend3.java Fri Oct 17 13:30:43 2008
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import java.io.IOException;
+import java.io.RandomAccessFile;
import junit.extensions.TestSetup;
import junit.framework.Test;
@@ -31,6 +32,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.server.protocol.BlockMetaDataInfo;
/** This class implements some of tests posted in HADOOP-2658. */
@@ -139,6 +141,51 @@
out.close();
}
+ /** TC7: Corrupted replicas are present. */
+ public void testTC7() throws Exception {
+ final short repl = 2;
+ final Path p = new Path("/TC7/foo");
+ System.out.println("p=" + p);
+
+ //a. Create file with replication factor of 2. Write half block of data. Close file.
+ final int len1 = (int)(BLOCK_SIZE/2);
+ {
+ FSDataOutputStream out = fs.create(p, false, buffersize, repl, BLOCK_SIZE);
+ AppendTestUtil.write(out, 0, len1);
+ out.close();
+ }
+ DFSTestUtil.waitReplication(fs, p, repl);
+
+ //b. Log into one datanode that has one replica of this block.
+ // Find the block file on this datanode and truncate it to zero size.
+ final LocatedBlocks locatedblocks = fs.dfs.namenode.getBlockLocations(p.toString(), 0L, len1);
+ assertEquals(1, locatedblocks.locatedBlockCount());
+ final LocatedBlock lb = locatedblocks.get(0);
+ final Block blk = lb.getBlock();
+ assertEquals(len1, lb.getBlockSize());
+
+ DatanodeInfo[] datanodeinfos = lb.getLocations();
+ assertEquals(repl, datanodeinfos.length);
+ final DataNode dn = cluster.getDataNode(datanodeinfos[0].getIpcPort());
+ final FSDataset data = (FSDataset)dn.getFSDataset();
+ final RandomAccessFile raf = new RandomAccessFile(data.getBlockFile(blk), "rw");
+ AppendTestUtil.LOG.info("dn=" + dn + ", blk=" + blk + " (length=" + blk.getNumBytes() + ")");
+ assertEquals(len1, raf.length());
+ raf.setLength(0);
+ raf.close();
+
+ //c. Open file in "append mode". Append a new block worth of data. Close file.
+ final int len2 = (int)BLOCK_SIZE;
+ {
+ FSDataOutputStream out = fs.append(p);
+ AppendTestUtil.write(out, len1, len2);
+ out.close();
+ }
+
+ //d. Reopen file and read two blocks worth of data.
+ AppendTestUtil.check(fs, p, len1 + len2);
+ }
+
/** TC11: Racing rename */
public void testTC11() throws Exception {
final Path p = new Path("/TC11/foo");