You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2013/07/25 02:32:41 UTC

svn commit: r1506789 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/protocol/ src/test/java/org/apache/hadoop/hdfs/server/namenode/

Author: jing9
Date: Thu Jul 25 00:32:40 2013
New Revision: 1506789

URL: http://svn.apache.org/r1506789
Log:
HDFS-5024. Make DatanodeProtocol#commitBlockSynchronization idempotent. Contributed by Arpit Agarwal.

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1506789&r1=1506788&r2=1506789&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Jul 25 00:32:40 2013
@@ -470,6 +470,9 @@ Release 2.1.0-beta - 2013-07-02
     HDFS-5020. Make DatanodeProtocol#blockReceivedAndDeleted idempotent. 
     (jing9)
 
+    HDFS-5024. Make DatanodeProtocol#commitBlockSynchronization idempotent. 
+    (Arpit Agarwal via jing9)
+
   OPTIMIZATIONS
 
     HDFS-4465. Optimize datanode ReplicasMap and ReplicaInfo. (atm)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1506789&r1=1506788&r2=1506789&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Jul 25 00:32:40 2013
@@ -3068,7 +3068,7 @@ public class FSNamesystem implements Nam
 
     for (Block b : blocks.getToDeleteList()) {
       if (trackBlockCounts) {
-        BlockInfo bi = blockManager.getStoredBlock(b);
+        BlockInfo bi = getStoredBlock(b);
         if (bi.isComplete()) {
           numRemovedComplete++;
           if (bi.numNodes() >= blockManager.minReplication) {
@@ -3509,6 +3509,11 @@ public class FSNamesystem implements Nam
     blockManager.checkReplication(newFile);
   }
 
+  @VisibleForTesting
+  BlockInfo getStoredBlock(Block block) {
+    return blockManager.getStoredBlock(block);
+  }
+
   void commitBlockSynchronization(ExtendedBlock lastblock,
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
@@ -3534,16 +3539,28 @@ public class FSNamesystem implements Nam
           "Cannot commitBlockSynchronization while in safe mode",
           safeMode);
       }
-      final BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock
-        .getLocalBlock(lastblock));
+      final BlockInfo storedBlock = getStoredBlock(
+          ExtendedBlock.getLocalBlock(lastblock));
       if (storedBlock == null) {
-        throw new IOException("Block (=" + lastblock + ") not found");
+        if (deleteblock) {
+          // This may be a retry attempt so ignore the failure
+          // to locate the block.
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Block (=" + lastblock + ") not found");
+          }
+          return;
+        } else {
+          throw new IOException("Block (=" + lastblock + ") not found");
+        }
       }
       INodeFile iFile = ((INode)storedBlock.getBlockCollection()).asFile();
       if (!iFile.isUnderConstruction() || storedBlock.isComplete()) {
-        throw new IOException("Unexpected block (=" + lastblock
-                              + ") since the file (=" + iFile.getLocalName()
-                              + ") is not under construction");
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Unexpected block (=" + lastblock
+                    + ") since the file (=" + iFile.getLocalName()
+                    + ") is not under construction");
+        }
+        return;
       }
 
       long recoveryId =
@@ -3559,11 +3576,9 @@ public class FSNamesystem implements Nam
       if (deleteblock) {
         Block blockToDel = ExtendedBlock.getLocalBlock(lastblock);
         boolean remove = pendingFile.removeLastBlock(blockToDel);
-        if (!remove) {
-          throw new IOException("Trying to delete non-existant block "
-              + blockToDel);
+        if (remove) {
+          blockManager.removeBlockFromMap(storedBlock);
         }
-        blockManager.removeBlockFromMap(storedBlock);
       }
       else {
         // update last block
@@ -3593,17 +3608,11 @@ public class FSNamesystem implements Nam
         pendingFile.setLastBlock(storedBlock, descriptors);
       }
 
-      src = leaseManager.findPath(pendingFile);
       if (closeFile) {
-        // commit the last block and complete it if it has minimum replicas
-        commitOrCompleteLastBlock(pendingFile, storedBlock);
-
-        //remove lease, close file
-        finalizeINodeFileUnderConstruction(src, pendingFile,
-            Snapshot.findLatestSnapshot(pendingFile, null));
+        src = closeFileCommitBlocks(pendingFile, storedBlock);
       } else {
         // If this commit does not want to close the file, persist blocks
-        dir.persistBlocks(src, pendingFile);
+        src = persistBlocks(pendingFile);
       }
     } finally {
       writeUnlock();
@@ -3620,6 +3629,44 @@ public class FSNamesystem implements Nam
     }
   }
 
+  /**
+   *
+   * @param pendingFile
+   * @param storedBlock
+   * @return Path of the file that was closed.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  String closeFileCommitBlocks(INodeFileUnderConstruction pendingFile,
+                                       BlockInfo storedBlock)
+      throws IOException {
+
+    String src = leaseManager.findPath(pendingFile);
+
+    // commit the last block and complete it if it has minimum replicas
+    commitOrCompleteLastBlock(pendingFile, storedBlock);
+
+    //remove lease, close file
+    finalizeINodeFileUnderConstruction(src, pendingFile,
+                                       Snapshot.findLatestSnapshot(pendingFile, null));
+
+    return src;
+  }
+
+  /**
+   * Persist the block list for the given file.
+   *
+   * @param pendingFile
+   * @return Path to the given file.
+   * @throws IOException
+   */
+  @VisibleForTesting
+  String persistBlocks(INodeFileUnderConstruction pendingFile)
+      throws IOException {
+    String src = leaseManager.findPath(pendingFile);
+    dir.persistBlocks(src, pendingFile);
+    return src;
+  }
 
   /**
    * Renew the lease(s) held by the given client
@@ -4662,7 +4709,7 @@ public class FSNamesystem implements Nam
     SafeModeInfo safeMode = this.safeMode;
     if (safeMode == null) // mostly true
       return;
-    BlockInfo storedBlock = blockManager.getStoredBlock(b);
+    BlockInfo storedBlock = getStoredBlock(b);
     if (storedBlock.isComplete()) {
       safeMode.decrementSafeBlockCount((short)blockManager.countNodes(b).liveReplicas());
     }
@@ -5279,7 +5326,7 @@ public class FSNamesystem implements Nam
     }
     
     // check stored block state
-    BlockInfo storedBlock = blockManager.getStoredBlock(ExtendedBlock.getLocalBlock(block));
+    BlockInfo storedBlock = getStoredBlock(ExtendedBlock.getLocalBlock(block));
     if (storedBlock == null || 
         storedBlock.getBlockUCState() != BlockUCState.UNDER_CONSTRUCTION) {
         throw new IOException(block + 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1506789&r1=1506788&r2=1506789&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Thu Jul 25 00:32:40 2013
@@ -166,7 +166,7 @@ public interface DatanodeProtocol {
   /**
    * Commit block synchronization in lease recovery
    */
-  @AtMostOnce
+  @Idempotent
   public void commitBlockSynchronization(ExtendedBlock block,
       long newgenerationstamp, long newlength,
       boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java?rev=1506789&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java Thu Jul 25 00:32:40 2013
@@ -0,0 +1,172 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.protocol.*;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
+import org.junit.Test;
+
+/**
+ * Verify that TestCommitBlockSynchronization is idempotent.
+ */
+public class TestCommitBlockSynchronization {
+  private static final long blockId = 100;
+  private static final long length = 200;
+  private static final long genStamp = 300;
+
+  private FSNamesystem makeNameSystemSpy(Block block,
+                                         INodeFileUnderConstruction file)
+      throws IOException {
+    Configuration conf = new Configuration();
+    FSImage image = new FSImage(conf);
+    DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
+
+    FSNamesystem namesystem = new FSNamesystem(conf, image);
+    FSNamesystem namesystemSpy = spy(namesystem);
+    BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction(
+        block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets);
+    blockInfo.setBlockCollection(file);
+    blockInfo.setGenerationStamp(genStamp);
+    blockInfo.initializeBlockRecovery(genStamp);
+    doReturn(true).when(file).removeLastBlock(any(Block.class));
+
+    doReturn(blockInfo).when(namesystemSpy).getStoredBlock(any(Block.class));
+    doReturn("").when(namesystemSpy).closeFileCommitBlocks(
+        any(INodeFileUnderConstruction.class),
+        any(BlockInfo.class));
+    doReturn("").when(namesystemSpy).persistBlocks(
+        any(INodeFileUnderConstruction.class));
+    doReturn(mock(FSEditLog.class)).when(namesystemSpy).getEditLog();
+
+    return namesystemSpy;
+  }
+
+  @Test
+  public void testCommitBlockSynchronization() throws IOException {
+    INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+    Block block = new Block(blockId, length, genStamp);
+    FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+    DatanodeID[] newTargets = new DatanodeID[0];
+
+    ExtendedBlock lastBlock = new ExtendedBlock();
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, false,
+        false, newTargets, null);
+
+    // Repeat the call to make sure it does not throw
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, false, false, newTargets, null);
+
+    // Simulate 'completing' the block.
+    BlockInfo completedBlockInfo = new BlockInfo(block, 1);
+    completedBlockInfo.setBlockCollection(file);
+    completedBlockInfo.setGenerationStamp(genStamp);
+    doReturn(completedBlockInfo).when(namesystemSpy)
+        .getStoredBlock(any(Block.class));
+
+    // Repeat the call to make sure it does not throw
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, false, false, newTargets, null);
+  }
+
+  @Test
+  public void testCommitBlockSynchronization2() throws IOException {
+    INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+    Block block = new Block(blockId, length, genStamp);
+    FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+    DatanodeID[] newTargets = new DatanodeID[0];
+
+    ExtendedBlock lastBlock = new ExtendedBlock();
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, false,
+        false, newTargets, null);
+
+    // Make sure the call fails if the generation stamp does not match
+    // the block recovery ID.
+    try {
+      namesystemSpy.commitBlockSynchronization(
+          lastBlock, genStamp - 1, length, false, false, newTargets, null);
+      fail("Failed to get expected IOException on generation stamp/" +
+           "recovery ID mismatch");
+    } catch (IOException ioe) {
+      // Expected exception.
+    }
+  }
+
+  @Test
+  public void testCommitBlockSynchronizationWithDelete() throws IOException {
+    INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+    Block block = new Block(blockId, length, genStamp);
+    FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+    DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
+    DatanodeID[] newTargets = new DatanodeID[0];
+
+    ExtendedBlock lastBlock = new ExtendedBlock();
+      namesystemSpy.commitBlockSynchronization(
+          lastBlock, genStamp, length, false,
+          true, newTargets, null);
+
+    // Simulate removing the last block from the file.
+    doReturn(false).when(file).removeLastBlock(any(Block.class));
+
+    // Repeat the call to make sure it does not throw
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, false, true, newTargets, null);
+  }
+
+  @Test
+  public void testCommitBlockSynchronizationWithClose() throws IOException {
+    INodeFileUnderConstruction file = mock(INodeFileUnderConstruction.class);
+    Block block = new Block(blockId, length, genStamp);
+    FSNamesystem namesystemSpy = makeNameSystemSpy(block, file);
+    DatanodeDescriptor[] targets = new DatanodeDescriptor[0];
+    DatanodeID[] newTargets = new DatanodeID[0];
+
+    ExtendedBlock lastBlock = new ExtendedBlock();
+      namesystemSpy.commitBlockSynchronization(
+          lastBlock, genStamp, length, true,
+          false, newTargets, null);
+
+    // Repeat the call to make sure it returns true
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, true, false, newTargets, null);
+
+    BlockInfo completedBlockInfo = new BlockInfo(block, 1);
+    completedBlockInfo.setBlockCollection(file);
+    completedBlockInfo.setGenerationStamp(genStamp);
+    doReturn(completedBlockInfo).when(namesystemSpy)
+        .getStoredBlock(any(Block.class));
+
+    namesystemSpy.commitBlockSynchronization(
+        lastBlock, genStamp, length, true, false, newTargets, null);
+  }
+}