You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by li...@apache.org on 2017/02/22 23:33:59 UTC
[13/50] [abbrv] hadoop git commit: HDFS-8498. Blocks can be committed
with wrong size. Contributed by Jing Zhao.
HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/627da6f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/627da6f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/627da6f7
Branch: refs/heads/HADOOP-13345
Commit: 627da6f7178e18aa41996969c408b6f344e297d1
Parents: 0fc6f38
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Feb 15 10:44:37 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Feb 15 10:44:37 2017 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DataStreamer.java | 100 +++++++++++++------
.../apache/hadoop/hdfs/StripedDataStreamer.java | 8 +-
.../apache/hadoop/hdfs/TestDFSOutputStream.java | 3 +-
3 files changed, 72 insertions(+), 39 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/627da6f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 8e6eb63..0268537 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -142,8 +142,6 @@ class DataStreamer extends Daemon {
/**
* Record a connection exception.
- * @param e
- * @throws InvalidEncryptionKeyException
*/
void recordFailure(final InvalidEncryptionKeyException e)
throws InvalidEncryptionKeyException {
@@ -178,9 +176,8 @@ class DataStreamer extends Daemon {
final StorageType[] targetStorageTypes,
final Token<BlockTokenIdentifier> blockToken) throws IOException {
//send the TRANSFER_BLOCK request
- new Sender(out)
- .transferBlock(block, blockToken, dfsClient.clientName, targets,
- targetStorageTypes);
+ new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
+ dfsClient.clientName, targets, targetStorageTypes);
out.flush();
//ack
BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -199,6 +196,42 @@ class DataStreamer extends Daemon {
}
}
+ static class BlockToWrite {
+ private ExtendedBlock currentBlock;
+
+ BlockToWrite(ExtendedBlock block) {
+ setCurrentBlock(block);
+ }
+
+ synchronized ExtendedBlock getCurrentBlock() {
+ return currentBlock == null ? null : new ExtendedBlock(currentBlock);
+ }
+
+ synchronized long getNumBytes() {
+ return currentBlock == null ? 0 : currentBlock.getNumBytes();
+ }
+
+ synchronized void setCurrentBlock(ExtendedBlock block) {
+ currentBlock = (block == null || block.getLocalBlock() == null) ?
+ null : new ExtendedBlock(block);
+ }
+
+ synchronized void setNumBytes(long numBytes) {
+ assert currentBlock != null;
+ currentBlock.setNumBytes(numBytes);
+ }
+
+ synchronized void setGenerationStamp(long generationStamp) {
+ assert currentBlock != null;
+ currentBlock.setGenerationStamp(generationStamp);
+ }
+
+ @Override
+ public synchronized String toString() {
+ return currentBlock == null ? "null" : currentBlock.toString();
+ }
+ }
+
/**
* Create a socket for a write pipeline
*
@@ -440,7 +473,7 @@ class DataStreamer extends Daemon {
}
private volatile boolean streamerClosed = false;
- protected volatile ExtendedBlock block; // its length is number of bytes acked
+ protected final BlockToWrite block; // its length is number of bytes acked
protected Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
@@ -508,7 +541,7 @@ class DataStreamer extends Daemon {
ByteArrayManager byteArrayManage,
boolean isAppend, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) {
- this.block = block;
+ this.block = new BlockToWrite(block);
this.dfsClient = dfsClient;
this.src = src;
this.progress = progress;
@@ -1322,7 +1355,7 @@ class DataStreamer extends Daemon {
LocatedBlock lb;
//get a new datanode
lb = dfsClient.namenode.getAdditionalDatanode(
- src, stat.getFileId(), block, nodes, storageIDs,
+ src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs,
exclude.toArray(new DatanodeInfo[exclude.size()]),
1, dfsClient.clientName);
// a new node was allocated by the namenode. Update nodes.
@@ -1440,7 +1473,7 @@ class DataStreamer extends Daemon {
} // while
if (success) {
- block = updatePipeline(newGS);
+ updatePipeline(newGS);
}
}
@@ -1536,21 +1569,22 @@ class DataStreamer extends Daemon {
}
private LocatedBlock updateBlockForPipeline() throws IOException {
- return dfsClient.namenode.updateBlockForPipeline(block,
+ return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(),
dfsClient.clientName);
}
- static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
- return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
- b.getNumBytes(), newGS);
+ void updateBlockGS(final long newGS) {
+ block.setGenerationStamp(newGS);
}
/** update pipeline at the namenode */
- ExtendedBlock updatePipeline(long newGS) throws IOException {
- final ExtendedBlock newBlock = newBlock(block, newGS);
- dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
- nodes, storageIDs);
- return newBlock;
+ private void updatePipeline(long newGS) throws IOException {
+ final ExtendedBlock oldBlock = block.getCurrentBlock();
+ // the new GS has been propagated to all DN, it should be ok to update the
+ // local block state
+ updateBlockGS(newGS);
+ dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
+ block.getCurrentBlock(), nodes, storageIDs);
}
DatanodeInfo[] getExcludedNodes() {
@@ -1570,31 +1604,29 @@ class DataStreamer extends Daemon {
StorageType[] storageTypes;
int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
- ExtendedBlock oldBlock = block;
+ final ExtendedBlock oldBlock = block.getCurrentBlock();
do {
errorState.resetInternalError();
lastException.clear();
DatanodeInfo[] excluded = getExcludedNodes();
- block = oldBlock;
- lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
- block = lb.getBlock();
+ lb = locateFollowingBlock(
+ excluded.length > 0 ? excluded : null, oldBlock);
+ block.setCurrentBlock(lb.getBlock());
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
storageTypes = lb.getStorageTypes();
- //
// Connect to first DataNode in the list.
- //
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) {
LOG.warn("Abandoning " + block);
- dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
- dfsClient.clientName);
- block = null;
+ dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
+ stat.getFileId(), src, dfsClient.clientName);
+ block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
@@ -1655,7 +1687,7 @@ class DataStreamer extends Daemon {
// We cannot change the block length in 'block' as it counts the number
// of bytes ack'ed.
- ExtendedBlock blockCopy = new ExtendedBlock(block);
+ ExtendedBlock blockCopy = block.getCurrentBlock();
blockCopy.setNumBytes(stat.getBlockSize());
boolean[] targetPinnings = getPinnings(nodes);
@@ -1765,9 +1797,9 @@ class DataStreamer extends Daemon {
}
}
- private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
- throws IOException {
- return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block,
+ private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
+ ExtendedBlock oldBlock) throws IOException {
+ return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
stat.getFileId(), favoredNodes, addBlockFlags);
}
@@ -1811,7 +1843,7 @@ class DataStreamer extends Daemon {
* @return the block this streamer is writing to
*/
ExtendedBlock getBlock() {
- return block;
+ return block.getCurrentBlock();
}
/**
@@ -2016,6 +2048,8 @@ class DataStreamer extends Daemon {
@Override
public String toString() {
- return block == null? "block==null": "" + block.getLocalBlock();
+ final ExtendedBlock extendedBlock = block.getCurrentBlock();
+ return extendedBlock == null ?
+ "block==null" : "" + extendedBlock.getLocalBlock();
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/627da6f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 89ab6a3..b457edb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -71,7 +71,7 @@ public class StripedDataStreamer extends DataStreamer {
@Override
protected void endBlock() {
- coordinator.offerEndBlock(index, block);
+ coordinator.offerEndBlock(index, block.getCurrentBlock());
super.endBlock();
}
@@ -93,7 +93,7 @@ public class StripedDataStreamer extends DataStreamer {
protected LocatedBlock nextBlockOutputStream() throws IOException {
boolean success;
LocatedBlock lb = getFollowingBlock();
- block = lb.getBlock();
+ block.setCurrentBlock(lb.getBlock());
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
@@ -105,7 +105,7 @@ public class StripedDataStreamer extends DataStreamer {
success = createBlockOutputStream(nodes, storageTypes, 0L, false);
if (!success) {
- block = null;
+ block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
@@ -161,7 +161,7 @@ public class StripedDataStreamer extends DataStreamer {
success = coordinator.takeStreamerUpdateResult(index);
if (success) {
// if all succeeded, update its block using the new GS
- block = newBlock(block, newGS);
+ updateBlockGS(newGS);
} else {
// otherwise close the block stream and restart the recovery process
closeStream();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/627da6f7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 750103d..9ec01b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -110,8 +110,7 @@ public class TestDFSOutputStream {
* packet size < 64kB. See HDFS-7308 for details.
*/
@Test
- public void testComputePacketChunkSize()
- throws Exception {
+ public void testComputePacketChunkSize() throws Exception {
DistributedFileSystem fs = cluster.getFileSystem();
FSDataOutputStream os = fs.create(new Path("/test"));
DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org