You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2013/07/25 02:32:41 UTC
svn commit: r1506789 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/namenode/
src/main/java/org/apache/hadoop/hdfs/server/protocol/
src/test/java/org/apache/hadoop/hdfs/server/namenode/
Author: jing9
Date: Thu Jul 25 00:32:40 2013
New Revision: 1506789
URL: http://svn.apache.org/r1506789
Log:
HDFS-5024. Make DatanodeProtocol#commitBlockSynchronization idempotent. Contributed by Arpit Agarwal.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1506789&r1=1506788&r2=1506789&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul 25 00:32:40 2013
@@ -470,6 +470,9 @@ Release 2.1.0-beta - 2013-07-02
HDFS-5020. Make DatanodeProtocol#blockReceivedAndDeleted idempotent.
(jing9)
+ HDFS-5024. Make DatanodeProtocol#commitBlockSynchronization idempotent.
+ (Arpit Agarwal via jing9)
+
OPTIMIZATIONS
HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1506789&r1=1506788&r2=1506789&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jul 25 00:32:40 2013
@@ -3068,7 +3068,7 @@ public class FSNamesystem implements Nam
for (Block b : blocks.getToDeleteList()) {
if (trackBlockCounts) {
- BlockInfo bi = blockManager.getStoredBlock(b);
+ BlockInfo bi = getStoredBlock(b);
if (bi.isComplete()) {
numRemovedComplete++;
if (bi.numNodes() >= blockManager.minReplication) {
@@ -3509,6 +3509,11 @@ public class FSNamesystem implements Nam
blockManager.checkReplication(newFile);
}
+ @VisibleForTesting
+ BlockInfo getStoredBlock(Block block) {
+ return blockManager.getStoredBlock(block);
+ }
+
void commitBlockSynchronization(ExtendedBlock lastblock,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
@@ -3534,16 +3539,28 @@ public class FSNamesystem implements Nam
"Cannot commitBlockSynchronization while in safe mode",
safeMode);
}
- final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
- .getLocalBlock(lastblock));
+ final BlockInfo storedBlock = getStoredBlock(
+ ExtendedBlock.getLocalBlock(lastblock));
if (storedBlock == null) {
- throw new IOException("Block (=" + lastblock + ") not found");
+ if (deleteblock) {
+ // This may be a retry attempt so ignore the failure
+ // to locate the block.
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Block (=" + lastblock + ") not found");
+ }
+ return;
+ } else {
+ throw new IOException("Block (=" + lastblock + ") not found");
+ }
}
INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
- throw new IOException("Unexpected block (=" + lastblock
- + ") since the file (=" + iFile.getLocalName()
- + ") is not under construction");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Unexpected block (=" + lastblock
+ + ") since the file (=" + iFile.getLocalName()
+ + ") is not under construction");
+ }
+ return;
}
long recoveryId =
@@ -3559,11 +3576,9 @@ public class FSNamesystem implements Nam
if (deleteblock) {
Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
boolean remove = pendingFile.removeLastBlock(blockToDel);
- if (!remove) {
- throw new IOException("Trying to delete non-existant block "
- + blockToDel);
+ if (remove) {
+ blockManager.removeBlockFromMap(storedBlock);
}
- blockManager.removeBlockFromMap(storedBlock);
}
else {
// update last block
@@ -3593,17 +3608,11 @@ public class FSNamesystem implements Nam
pendingFile.setLastBlock(storedBlock, descriptors);
}
- src = leaseManager.findPath(pendingFile);
if (closeFile) {
- // commit the last block and complete it if it has minimum replicas
- commitOrCompleteLastBlock(pendingFile, storedBlock);
-
- //remove lease, close file
- finalizeINodeFileUnderConstruction(src, pendingFile,
- Snapshot.findLatestSnapshot(pendingFile, null));
+ src = closeFileCommitBlocks(pendingFile, storedBlock);
} else {
// If this commit does not want to close the file, persist blocks
- dir.persistBlocks(src, pendingFile);
+ src = persistBlocks(pendingFile);
}
} finally {
writeUnlock();
@@ -3620,6 +3629,44 @@ public class FSNamesystem implements Nam
}
}
+ /**
+ *
+ * @param pendingFile
+ * @param storedBlock
+ * @return Path of the file that was closed.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile,
+ BlockInfo storedBlock)
+ throws IOException {
+
+ String src = leaseManager.findPath(pendingFile);
+
+ // commit the last block and complete it if it has minimum replicas
+ commitOrCompleteLastBlock(pendingFile, storedBlock);
+
+ //remove lease, close file
+ finalizeINodeFileUnderConstruction(src, pendingFile,
+ Snapshot.findLatestSnapshot(pendingFile, null));
+
+ return src;
+ }
+
+ /**
+ * Persist the block list for the given file.
+ *
+ * @param pendingFile
+ * @return Path to the given file.
+ * @throws IOException
+ */
+ @VisibleForTesting
+ String persistBlocks(INodeFileUnderConstruction pendingFile)
+ throws IOException {
+ String src = leaseManager.findPath(pendingFile);
+ dir.persistBlocks(src, pendingFile);
+ return src;
+ }
/**
* Renew the lease(s) held by the given client
@@ -4662,7 +4709,7 @@ public class FSNamesystem implements Nam
SafeModeInfo safeMode = this.safeMode;
if (safeMode == null) // mostly true
return;
- BlockInfo storedBlock = blockManager.getStoredBlock(b);
+ BlockInfo storedBlock = getStoredBlock(b);
if (storedBlock.isComplete()) {
safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
}
@@ -5279,7 +5326,7 @@ public class FSNamesystem implements Nam
}
// check stored block state
- BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock.getLocalBlock(block));
+ BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
if (storedBlock == null ||
storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
throw new IOException(block +
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1506789&r1=1506788&r2=1506789&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Thu Jul 25 00:32:40 2013
@@ -166,7 +166,7 @@ public interface DatanodeProtocol {
/**
* Commit block synchronization in lease recovery
*/
- @AtMostOnce
+ @Idempotent
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java?rev=1506789&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java Thu Jul 25 00:32:40 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.junit.Test;
+
+/**
+ * Verify that TestCommitBlockSynchronization is idempotent.
+ */
+public class TestCommitBlockSynchronization {
+ private static final long blockId = 100;
+ private static final long length = 200;
+ private static final long genStamp = 300;
+
+ private FSNamesystem makeNameSystemSpy(Block block,
+ INodeFileUnderConstruction file)
+ throws IOException {
+ Configuration conf = new Configuration();
+ FSImage image = new FSImage(conf);
+ DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
+
+ FSNamesystem namesystem = new FSNamesystem(conf, image);
+ FSNamesystem namesystemSpy = spy(namesystem);
+ BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
+ block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
+ blockInfo.setBlockCollection(file);
+ blockInfo.setGenerationStamp(genStamp);
+ blockInfo.initializeBlockRecovery(genStamp);
+ doReturn(true).when(file).removeLastBlock(any(Block.class));
+
+ doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
+ doReturn("").when(namesystemSpy).closeFileCommitBlocks(
+ any(INodeFileUnderConstruction.class),
+ any(BlockInfo.class));
+ doReturn("").when(namesystemSpy).persistBlocks(
+ any(INodeFileUnderConstruction.class));
+ doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
+
+ return namesystemSpy;
+ }
+
+ @Test
+ public void testCommitBlockSynchronization() throws IOException {
+ INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+ Block block = new Block(blockId, length, genStamp);
+ FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+ DatanodeID[] newTargets = new DatanodeID[0];
+
+ ExtendedBlock lastBlock = new ExtendedBlock();
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, false,
+ false, newTargets, null);
+
+ // Repeat the call to make sure it does not throw
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, false, false, newTargets, null);
+
+ // Simulate 'completing' the block.
+ BlockInfo completedBlockInfo = new BlockInfo(block, 1);
+ completedBlockInfo.setBlockCollection(file);
+ completedBlockInfo.setGenerationStamp(genStamp);
+ doReturn(completedBlockInfo).when(namesystemSpy)
+ .getStoredBlock(any(Block.class));
+
+ // Repeat the call to make sure it does not throw
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, false, false, newTargets, null);
+ }
+
+ @Test
+ public void testCommitBlockSynchronization2() throws IOException {
+ INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+ Block block = new Block(blockId, length, genStamp);
+ FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+ DatanodeID[] newTargets = new DatanodeID[0];
+
+ ExtendedBlock lastBlock = new ExtendedBlock();
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, false,
+ false, newTargets, null);
+
+ // Make sure the call fails if the generation stamp does not match
+ // the block recovery ID.
+ try {
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp - 1, length, false, false, newTargets, null);
+ fail("Failed to get expected IOException on generation stamp/" +
+ "recovery ID mismatch");
+ } catch (IOException ioe) {
+ // Expected exception.
+ }
+ }
+
+ @Test
+ public void testCommitBlockSynchronizationWithDelete() throws IOException {
+ INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+ Block block = new Block(blockId, length, genStamp);
+ FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+ DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
+ DatanodeID[] newTargets = new DatanodeID[0];
+
+ ExtendedBlock lastBlock = new ExtendedBlock();
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, false,
+ true, newTargets, null);
+
+ // Simulate removing the last block from the file.
+ doReturn(false).when(file).removeLastBlock(any(Block.class));
+
+ // Repeat the call to make sure it does not throw
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, false, true, newTargets, null);
+ }
+
+ @Test
+ public void testCommitBlockSynchronizationWithClose() throws IOException {
+ INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+ Block block = new Block(blockId, length, genStamp);
+ FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+ DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
+ DatanodeID[] newTargets = new DatanodeID[0];
+
+ ExtendedBlock lastBlock = new ExtendedBlock();
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, true,
+ false, newTargets, null);
+
+ // Repeat the call to make sure it returns true
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, true, false, newTargets, null);
+
+ BlockInfo completedBlockInfo = new BlockInfo(block, 1);
+ completedBlockInfo.setBlockCollection(file);
+ completedBlockInfo.setGenerationStamp(genStamp);
+ doReturn(completedBlockInfo).when(namesystemSpy)
+ .getStoredBlock(any(Block.class));
+
+ namesystemSpy.commitBlockSynchronization(
+ lastBlock, genStamp, length, true, false, newTargets, null);
+ }
+}