You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2017/05/16 18:04:50 UTC
hadoop git commit: HDFS-8498. Blocks can be committed with wrong
size. Contributed by Jing Zhao. Backport HDFS-11732 by Zhe Zhang.
Repository: hadoop
Updated Branches:
refs/heads/branch-2.7 300435863 -> 0823fb72a
HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao. Backport HDFS-11732 by Zhe Zhang.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0823fb72
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0823fb72
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0823fb72
Branch: refs/heads/branch-2.7
Commit: 0823fb72a9afb696c357e0b611f2e04b367b8f24
Parents: 3004358
Author: Konstantin V Shvachko <sh...@apache.org>
Authored: Tue May 16 11:00:43 2017 -0700
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Tue May 16 11:03:12 2017 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 9 +-
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 104 +++++++++++++------
.../apache/hadoop/hdfs/TestDFSOutputStream.java | 3 +-
3 files changed, 80 insertions(+), 36 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0823fb72/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 16e8aca..19b2996 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -278,9 +278,9 @@ Release 2.7.4 - UNRELEASED
HDFS-11795. Fix ASF License warnings in branch-2.7.
(Yiqun Lin via aajisaka)
- HDFS-11674. reserveSpaceForReplicas is not released if append request failed
- due to mirror down and replica recovered (vinayakumarb)
-
+ HDFS-11674. reserveSpaceForReplicas is not released if append request failed
+ due to mirror down and replica recovered (vinayakumarb)
+
HDFS-10987. Make Decommission less expensive when lot of blocks present.
(Brahma Reddy Battula)
@@ -288,6 +288,9 @@ Release 2.7.4 - UNRELEASED
(Contributed By Weiwei Yang via Eric Yang)
HDFS-11784. Backport HDFS-8312 contributed by Brahma Reddy Battula.
+ HDFS-8498. Blocks can be committed with wrong size. (Jing Zhao)
+ Backport HDFS-11732 by Zhe Zhang.
+
Release 2.7.3 - 2016-08-25
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0823fb72/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 1272e29..188502f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -293,9 +293,8 @@ public class DFSOutputStream extends FSOutputSummer
final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request
- new Sender(out)
- .transferBlock(block, blockToken, dfsClient.clientName, targets,
- targetStorageTypes);
+ new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
+ dfsClient.clientName, targets, targetStorageTypes);
out.flush();
//ack
BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -315,7 +314,7 @@ public class DFSOutputStream extends FSOutputSummer
}
private volatile boolean streamerClosed = false;
- private volatile ExtendedBlock block; // its length is number of bytes acked
+ private final BlockToWrite block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
@@ -364,7 +363,7 @@ public class DFSOutputStream extends FSOutputSummer
private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
isAppend = false;
isLazyPersistFile = isLazyPersist(stat);
- this.block = block;
+ this.block = new BlockToWrite(block);
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
}
@@ -379,7 +378,7 @@ public class DFSOutputStream extends FSOutputSummer
int bytesPerChecksum) throws IOException {
isAppend = true;
stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
- block = lastBlock.getBlock();
+ block = new BlockToWrite(lastBlock.getBlock());
bytesSent = block.getNumBytes();
accessToken = lastBlock.getBlockToken();
isLazyPersistFile = isLazyPersist(stat);
@@ -1082,7 +1081,7 @@ public class DFSOutputStream extends FSOutputSummer
LocatedBlock lb;
//get a new datanode
lb = dfsClient.namenode.getAdditionalDatanode(
- src, fileId, block, nodes, storageIDs,
+ src, fileId, block.getCurrentBlock(), nodes, storageIDs,
exclude.toArray(new DatanodeInfo[exclude.size()]),
1, dfsClient.clientName);
// a new node was allocated by the namenode. Update nodes.
@@ -1260,7 +1259,8 @@ public class DFSOutputStream extends FSOutputSummer
}
// get a new generation stamp and an access token
- LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
+ LocatedBlock lb = dfsClient.namenode.
+ updateBlockForPipeline(block.getCurrentBlock(), dfsClient.clientName);
newGS = lb.getBlock().getGenerationStamp();
accessToken = lb.getBlockToken();
@@ -1308,16 +1308,21 @@ public class DFSOutputStream extends FSOutputSummer
if (success) {
// update pipeline at the namenode
- ExtendedBlock newBlock = new ExtendedBlock(
- block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
- dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
- nodes, storageIDs);
- // update client side generation stamp
- block = newBlock;
+ final ExtendedBlock oldBlock = block.getCurrentBlock();
+ // the new GS has been propagated to all DN, it should be ok to update the
+ // local block state
+ block.setGenerationStamp(newGS);
+ dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
+ block.getCurrentBlock(), nodes, storageIDs);
}
return false; // do not sleep, continue processing
}
+ DatanodeInfo[] getExcludedNodes() {
+ return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
+ .keySet().toArray(new DatanodeInfo[0]);
+ }
+
/**
* Open a DataOutputStream to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
@@ -1330,20 +1335,17 @@ public class DFSOutputStream extends FSOutputSummer
StorageType[] storageTypes = null;
int count = dfsClient.getConf().nBlockWriteRetry;
boolean success = false;
- ExtendedBlock oldBlock = block;
+ ExtendedBlock oldBlock = block.getCurrentBlock();
do {
hasError = false;
lastException.set(null);
errorIndex = -1;
success = false;
- DatanodeInfo[] excluded =
- excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
- .keySet()
- .toArray(new DatanodeInfo[0]);
- block = oldBlock;
- lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
- block = lb.getBlock();
+ DatanodeInfo[] excluded = getExcludedNodes();
+ lb = locateFollowingBlock(
+ excluded.length > 0 ? excluded : null, oldBlock);
+ block.setCurrentBlock(lb.getBlock());
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
@@ -1357,9 +1359,9 @@ public class DFSOutputStream extends FSOutputSummer
if (!success) {
DFSClient.LOG.info("Abandoning " + block);
- dfsClient.namenode.abandonBlock(block, fileId, src,
- dfsClient.clientName);
- block = null;
+ dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
+ fileId, src, dfsClient.clientName);
+ block.setCurrentBlock(null);
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
}
@@ -1421,7 +1423,7 @@ public class DFSOutputStream extends FSOutputSummer
// We cannot change the block length in 'block' as it counts the number
// of bytes ack'ed.
- ExtendedBlock blockCopy = new ExtendedBlock(block);
+ ExtendedBlock blockCopy = block.getCurrentBlock();
blockCopy.setNumBytes(blockSize);
boolean[] targetPinnings = getPinnings(nodes, true);
@@ -1539,7 +1541,8 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException {
+ protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
+ ExtendedBlock oldBlock) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = 400;
while (true) {
@@ -1547,7 +1550,7 @@ public class DFSOutputStream extends FSOutputSummer
while (true) {
try {
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
- block, excludedNodes, fileId, favoredNodes);
+ oldBlock, excluded, fileId, favoredNodes);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
@@ -1591,7 +1594,7 @@ public class DFSOutputStream extends FSOutputSummer
}
ExtendedBlock getBlock() {
- return block;
+ return block.getCurrentBlock();
}
DatanodeInfo[] getNodes() {
@@ -1607,6 +1610,42 @@ public class DFSOutputStream extends FSOutputSummer
}
}
+ static class BlockToWrite {
+ private ExtendedBlock currentBlock;
+
+ BlockToWrite(ExtendedBlock block) {
+ setCurrentBlock(block);
+ }
+
+ synchronized ExtendedBlock getCurrentBlock() {
+ return currentBlock == null ? null : new ExtendedBlock(currentBlock);
+ }
+
+ synchronized long getNumBytes() {
+ return currentBlock == null ? 0 : currentBlock.getNumBytes();
+ }
+
+ synchronized void setCurrentBlock(ExtendedBlock block) {
+ currentBlock = (block == null || block.getLocalBlock() == null) ?
+ null : new ExtendedBlock(block);
+ }
+
+ synchronized void setNumBytes(long numBytes) {
+ assert currentBlock != null;
+ currentBlock.setNumBytes(numBytes);
+ }
+
+ synchronized void setGenerationStamp(long generationStamp) {
+ assert currentBlock != null;
+ currentBlock.setGenerationStamp(generationStamp);
+ }
+
+ @Override
+ public synchronized String toString() {
+ return currentBlock == null ? "null" : currentBlock.toString();
+ }
+ }
+
/**
* Create a socket for a write pipeline
* @param first the first datanode
@@ -2169,8 +2208,11 @@ public class DFSOutputStream extends FSOutputSummer
// update the block length first time irrespective of flag
if (updateLength || persistBlocks.get()) {
synchronized (this) {
- if (streamer != null && streamer.block != null) {
- lastBlockLength = streamer.block.getNumBytes();
+ if (streamer != null && !streamer.streamerClosed) {
+ final ExtendedBlock block = streamer.getBlock();
+ if (block != null) {
+ lastBlockLength = block.getNumBytes();
+ }
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0823fb72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 8c46564..6173b7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -81,8 +81,7 @@ public class TestDFSOutputStream {
* packet size < 64kB. See HDFS-7308 for details.
*/
@Test
- public void testComputePacketChunkSize()
- throws Exception {
+ public void testComputePacketChunkSize() throws Exception {
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org