You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/02 21:59:52 UTC
svn commit: r1164680 - in /hadoop/common/branches/branch-0.20-security: ./
src/core/org/apache/hadoop/io/ src/hdfs/org/apache/hadoop/hdfs/
src/hdfs/org/apache/hadoop/hdfs/protocol/
src/hdfs/org/apache/hadoop/hdfs/server/datanode/ src/hdfs/org/apache/ha...
Author: suresh
Date: Fri Sep 2 19:59:52 2011
New Revision: 1164680
URL: http://svn.apache.org/viewvc?rev=1164680&view=rev
Log:
Porting from 0.20-append branch - HDFS-200. Support append and sync for hadoop 0.20 branch. Contributed by Dhruba Borthakur.
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep 2 19:59:52 2011
@@ -7,6 +7,8 @@ Release 0.20.205.0 - unreleased
HDFS-2202. Add a new DFSAdmin command to set balancer bandwidth of
datanodes without restarting. (Eric Payne via szetszwo)
+ HDFS-200. Support append and sync for hadoop 0.20 branch. (dhruba)
+
BUG FIXES
MAPREDUCE-2324. Removed usage of broken
Modified: hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/core/org/apache/hadoop/io/SequenceFile.java Fri Sep 2 19:59:52 2011
@@ -938,6 +938,13 @@ public class SequenceFile {
}
}
+ /** flush all currently written data to the file system */
+ public void syncFs() throws IOException {
+ if (out != null) {
+ out.sync(); // flush contents to file system
+ }
+ }
+
/** Returns the configuration of this file. */
Configuration getConf() { return conf; }
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep 2 19:59:52 2011
@@ -1604,7 +1604,10 @@ public class DFSClient implements FSCons
throw new FileNotFoundException("File does not exist: " + src);
}
- if (locatedBlocks != null) {
+ // I think this check is not correct. A file could have been appended to
+ // between two calls to openInfo().
+ if (locatedBlocks != null && !locatedBlocks.isUnderConstruction() &&
+ !newInfo.isUnderConstruction()) {
Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
while (oldIter.hasNext() && newIter.hasNext()) {
@@ -1613,6 +1616,39 @@ public class DFSClient implements FSCons
}
}
}
+
+ // if the file is under construction, then fetch size of last block
+ // from datanode.
+ if (newInfo.isUnderConstruction() && newInfo.locatedBlockCount() > 0) {
+ LocatedBlock last = newInfo.get(newInfo.locatedBlockCount()-1);
+ boolean lastBlockInFile = (last.getStartOffset() +
+ last.getBlockSize() ==
+ newInfo.getFileLength());
+ if (lastBlockInFile && last.getLocations().length > 0) {
+ ClientDatanodeProtocol primary = null;
+ DatanodeInfo primaryNode = last.getLocations()[0];
+ try {
+ primary = createClientDatanodeProtocolProxy(primaryNode, conf,
+ last.getBlock(), last.getBlockToken(), socketTimeout);
+ Block newBlock = primary.getBlockInfo(last.getBlock());
+ long newBlockSize = newBlock.getNumBytes();
+ long delta = newBlockSize - last.getBlockSize();
+ // if the size of the block on the datanode is different
+ // from what the NN knows about, the datanode wins!
+ last.getBlock().setNumBytes(newBlockSize);
+ long newlength = newInfo.getFileLength() + delta;
+ newInfo.setFileLength(newlength);
+ LOG.debug("DFSClient setting last block " + last +
+ " to length " + newBlockSize +
+ " filesize is now " + newInfo.getFileLength());
+ } catch (IOException e) {
+ LOG.debug("DFSClient file " + src +
+ " is being concurrently append to" +
+ " but datanode " + primaryNode.getHostName() +
+ " probably does not have block " + last.getBlock());
+ }
+ }
+ }
this.locatedBlocks = newInfo;
this.currentNode = null;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Fri Sep 2 19:59:52 2011
@@ -32,9 +32,9 @@ public interface ClientDatanodeProtocol
public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
/**
- * 4: never return null and always return a newly generated access token
+ * 5: added getBlockInfo
*/
- public static final long versionID = 4L;
+ public static final long versionID = 5L;
/** Start generation-stamp recovery for specified block
* @param block the specified block
@@ -47,4 +47,12 @@ public interface ClientDatanodeProtocol
*/
LocatedBlock recoverBlock(Block block, boolean keepLength,
DatanodeInfo[] targets) throws IOException;
+
+ /** Returns a block object that contains the specified block object
+ * from the specified Datanode.
+ * @param block the specified block
+ * @return the Block object from the specified Datanode
+ * @throws IOException if the block does not exist
+ */
+ Block getBlockInfo(Block block) throws IOException;
}
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/LocatedBlocks.java Fri Sep 2 19:59:52 2011
@@ -85,6 +85,13 @@ public class LocatedBlocks implements Wr
public boolean isUnderConstruction() {
return underConstruction;
}
+
+ /**
+ * Sets the file length of the file.
+ */
+ public void setFileLength(long length) {
+ this.fileLength = length;
+ }
/**
* Find block containing specified offset.
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Fri Sep 2 19:59:52 2011
@@ -97,7 +97,7 @@ class BlockReceiver implements java.io.C
// Open local disk out
//
streams = datanode.data.writeToBlock(block, isRecovery);
- this.finalized = datanode.data.isValidBlock(block);
+ this.finalized = false;
if (streams != null) {
this.out = streams.dataOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/DataNode.java Fri Sep 2 19:59:52 2011
@@ -1841,6 +1841,12 @@ public class DataNode extends Configured
return recoverBlock(block, keepLength, targets, false);
}
+ /** {@inheritDoc} */
+ public Block getBlockInfo(Block block) throws IOException {
+ Block stored = data.getStoredBlock(block.getBlockId());
+ return stored;
+ }
+
private static void logRecoverBlock(String who,
Block block, DatanodeID[] targets) {
StringBuilder msg = new StringBuilder(targets[0].getName());
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Fri Sep 2 19:59:52 2011
@@ -1136,12 +1136,12 @@ public class FSDataset implements FSCons
v = volumes.getNextVolume(blockSize);
// create temporary file to hold block in the designated volume
f = createTmpFile(v, b);
- volumeMap.put(b, new DatanodeBlockInfo(v));
+ volumeMap.put(b, new DatanodeBlockInfo(v, f));
} else if (f != null) {
DataNode.LOG.info("Reopen already-open Block for append " + b);
// create or reuse temporary file to hold block in the designated volume
v = volumeMap.get(b).getVolume();
- volumeMap.put(b, new DatanodeBlockInfo(v));
+ volumeMap.put(b, new DatanodeBlockInfo(v, f));
} else {
// reopening block for appending to it.
DataNode.LOG.info("Reopen Block for append " + b);
@@ -1172,7 +1172,7 @@ public class FSDataset implements FSCons
" to tmp dir " + f);
}
}
- volumeMap.put(b, new DatanodeBlockInfo(v));
+ volumeMap.put(b, new DatanodeBlockInfo(v, f));
}
if (f == null) {
DataNode.LOG.warn("Block " + b + " reopen failed " +
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/DatanodeDescriptor.java Fri Sep 2 19:59:52 2011
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.protocol.B
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.namenode.BlocksMap.BlockInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -391,11 +392,28 @@ public class DatanodeDescriptor extends
// Note we are taking special precaution to limit tmp blocks allocated
// as part this block report - which why block list is stored as longs
Block iblk = new Block(); // a fixed new'ed block to be reused with index i
+ Block oblk = new Block(); // for fixing genstamps
for (int i = 0; i < newReport.getNumberOfBlocks(); ++i) {
iblk.set(newReport.getBlockId(i), newReport.getBlockLen(i),
newReport.getBlockGenStamp(i));
BlockInfo storedBlock = blocksMap.getStoredBlock(iblk);
if(storedBlock == null) {
+ // if the block with a WILDCARD generation stamp matches
+ // then accept this block.
+ // This block has a diferent generation stamp on the datanode
+ // because of a lease-recovery-attempt.
+ oblk.set(newReport.getBlockId(i), newReport.getBlockLen(i),
+ GenerationStamp.WILDCARD_STAMP);
+ storedBlock = blocksMap.getStoredBlock(oblk);
+ if (storedBlock != null && storedBlock.getINode() != null &&
+ (storedBlock.getGenerationStamp() <= iblk.getGenerationStamp() ||
+ storedBlock.getINode().isUnderConstruction())) {
+ // accept block. It wil be cleaned up on cluster restart.
+ } else {
+ storedBlock = null;
+ }
+ }
+ if(storedBlock == null) {
// If block is not in blocksMap it does not belong to any file
toInvalidate.add(new Block(iblk));
continue;
Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1164680&r1=1164679&r2=1164680&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 2 19:59:52 2011
@@ -1992,11 +1992,30 @@ public class FSNamesystem implements FSC
}
/**
+ * This is invoked when a lease expires. On lease expiry,
+ * all the files that were written from that dfsclient should be
+ * recovered.
+ */
+ void internalReleaseLease(Lease lease, String src) throws IOException {
+ if (lease.hasPath()) {
+ // make a copy of the paths because internalReleaseLeaseOne removes
+ // pathnames from the lease record.
+ String[] leasePaths = new String[lease.getPaths().size()];
+ lease.getPaths().toArray(leasePaths);
+ for (String p: leasePaths) {
+ internalReleaseLeaseOne(lease, p);
+ }
+ } else {
+ internalReleaseLeaseOne(lease, src);
+ }
+ }
+
+ /**
* Move a file that is being written to be immutable.
* @param src The filename
* @param lease The lease for the client creating the file
*/
- void internalReleaseLease(Lease lease, String src) throws IOException {
+ void internalReleaseLeaseOne(Lease lease, String src) throws IOException {
LOG.info("Recovering lease=" + lease + ", src=" + src);
INodeFile iFile = dir.getFileINode(src);
@@ -2104,20 +2123,11 @@ public class FSNamesystem implements FSC
descriptors = new DatanodeDescriptor[newtargets.length];
for(int i = 0; i < newtargets.length; i++) {
descriptors[i] = getDatanode(newtargets[i]);
- }
- }
- if (closeFile) {
- // the file is getting closed. Insert block locations into blocksMap.
- // Otherwise fsck will report these blocks as MISSING, especially if the
- // blocksReceived from Datanodes take a long time to arrive.
- for (int i = 0; i < descriptors.length; i++) {
descriptors[i].addBlock(newblockinfo);
}
- pendingFile.setLastBlock(newblockinfo, null);
- } else {
- // add locations into the INodeUnderConstruction
- pendingFile.setLastBlock(newblockinfo, descriptors);
}
+ // add locations into the INodeUnderConstruction
+ pendingFile.setLastBlock(newblockinfo, descriptors);
}
// If this commit does not want to close the file, persist
@@ -2546,9 +2556,11 @@ public class FSNamesystem implements FSC
LOG.warn("ReplicationMonitor thread received InterruptedException." + ie);
break;
} catch (IOException ie) {
- LOG.warn("ReplicationMonitor thread received exception. " + ie);
+ LOG.warn("ReplicationMonitor thread received exception. " + ie + " " +
+ StringUtils.stringifyException(ie));
} catch (Throwable t) {
- LOG.warn("ReplicationMonitor thread received Runtime exception. " + t);
+ LOG.warn("ReplicationMonitor thread received Runtime exception. " + t + " " +
+ StringUtils.stringifyException(t));
Runtime.getRuntime().exit(-1);
}
}
@@ -3231,6 +3243,24 @@ public class FSNamesystem implements FSC
DatanodeDescriptor node,
DatanodeDescriptor delNodeHint) {
BlockInfo storedBlock = blocksMap.getStoredBlock(block);
+ if (storedBlock == null) {
+ // if the block with a WILDCARD generation stamp matches and the
+ // corresponding file is under construction, then accept this block.
+ // This block has a diferent generation stamp on the datanode
+ // because of a lease-recovery-attempt.
+ Block nblk = new Block(block.getBlockId());
+ storedBlock = blocksMap.getStoredBlock(nblk);
+ if (storedBlock != null && storedBlock.getINode() != null &&
+ (storedBlock.getGenerationStamp() <= block.getGenerationStamp() ||
+ storedBlock.getINode().isUnderConstruction())) {
+ NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
+ + "addStoredBlock request received for " + block + " on "
+ + node.getName() + " size " + block.getNumBytes()
+ + " and it belongs to a file under construction. ");
+ } else {
+ storedBlock = null;
+ }
+ }
if(storedBlock == null || storedBlock.getINode() == null) {
// If this block does not belong to anyfile, then we are done.
NameNode.stateChangeLog.info("BLOCK* NameSystem.addStoredBlock: "
@@ -3251,6 +3281,8 @@ public class FSNamesystem implements FSC
if (block != storedBlock) {
if (block.getNumBytes() >= 0) {
long cursize = storedBlock.getNumBytes();
+ INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
+ boolean underConstruction = (file == null ? false : file.isUnderConstruction());
if (cursize == 0) {
storedBlock.setNumBytes(block.getNumBytes());
} else if (cursize != block.getNumBytes()) {
@@ -3262,9 +3294,11 @@ public class FSNamesystem implements FSC
if (cursize > block.getNumBytes()) {
// new replica is smaller in size than existing block.
// Mark the new replica as corrupt.
- LOG.warn("Mark new replica " + block + " from " + node.getName() +
- "as corrupt because its length is shorter than existing ones");
- markBlockAsCorrupt(block, node);
+ if (!underConstruction) {
+ LOG.warn("Mark new replica " + block + " from " + node.getName() +
+ "as corrupt because its length is shorter than existing ones");
+ markBlockAsCorrupt(block, node);
+ }
} else {
// new replica is larger in size than existing block.
// Mark pre-existing replicas as corrupt.
@@ -3278,7 +3312,7 @@ public class FSNamesystem implements FSC
nodes[count++] = dd;
}
}
- for (int j = 0; j < count; j++) {
+ for (int j = 0; j < count && !underConstruction; j++) {
LOG.warn("Mark existing replica " + block + " from " + node.getName() +
" as corrupt because its length is shorter than the new one");
markBlockAsCorrupt(block, nodes[j]);
@@ -3301,7 +3335,6 @@ public class FSNamesystem implements FSC
}
//Updated space consumed if required.
- INodeFile file = (storedBlock != null) ? storedBlock.getINode() : null;
long diff = (file == null) ? 0 :
(file.getPreferredBlockSize() - storedBlock.getNumBytes());