You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2017/05/16 18:04:50 UTC

hadoop git commit: HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao. Backport HDFS-11732 by Zhe Zhang.

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.7 300435863 -> 0823fb72a


HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao. Backport HDFS-11732 by Zhe Zhang.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0823fb72
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0823fb72
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0823fb72

Branch: refs/heads/branch-2.7
Commit: 0823fb72a9afb696c357e0b611f2e04b367b8f24
Parents: 3004358
Author: Konstantin V Shvachko <sh...@apache.org>
Authored: Tue May 16 11:00:43 2017 -0700
Committer: Konstantin V Shvachko <sh...@apache.org>
Committed: Tue May 16 11:03:12 2017 -0700

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   9 +-
 .../org/apache/hadoop/hdfs/DFSOutputStream.java | 104 +++++++++++++------
 .../apache/hadoop/hdfs/TestDFSOutputStream.java |   3 +-
 3 files changed, 80 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0823fb72/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 16e8aca..19b2996 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -278,9 +278,9 @@ Release 2.7.4 - UNRELEASED
     HDFS-11795. Fix ASF License warnings in branch-2.7.
     (Yiqun Lin via aajisaka)
 
-	HDFS-11674. reserveSpaceForReplicas is not released if append request failed
-	due to mirror down and replica recovered (vinayakumarb)
-	
+    HDFS-11674. reserveSpaceForReplicas is not released if append request failed
+    due to mirror down and replica recovered (vinayakumarb)
+
     HDFS-10987. Make Decommission less expensive when lot of blocks present. 
     (Brahma Reddy Battula)
     
@@ -288,6 +288,9 @@ Release 2.7.4 - UNRELEASED
    (Contributed By Weiwei Yang via Eric Yang)
     HDFS-11784. Backport HDFS-8312 contributed by Brahma Reddy Battula.
 
+    HDFS-8498. Blocks can be committed with wrong size. (Jing Zhao)
+    Backport HDFS-11732 by Zhe Zhang.
+
 Release 2.7.3 - 2016-08-25
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0823fb72/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 1272e29..188502f 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -293,9 +293,8 @@ public class DFSOutputStream extends FSOutputSummer
           final StorageType[] targetStorageTypes,
           final Token<BlockTokenIdentifier> blockToken) throws IOException {
         //send the TRANSFER_BLOCK request
-        new Sender(out)
-            .transferBlock(block, blockToken, dfsClient.clientName, targets,
-                targetStorageTypes);
+        new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
+            dfsClient.clientName, targets, targetStorageTypes);
         out.flush();
         //ack
         BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -315,7 +314,7 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     private volatile boolean streamerClosed = false;
-    private volatile ExtendedBlock block; // its length is number of bytes acked
+    private final BlockToWrite block; // its length is number of bytes acked
     private Token<BlockTokenIdentifier> accessToken;
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;
@@ -364,7 +363,7 @@ public class DFSOutputStream extends FSOutputSummer
     private DataStreamer(HdfsFileStatus stat, ExtendedBlock block) {
       isAppend = false;
       isLazyPersistFile = isLazyPersist(stat);
-      this.block = block;
+      this.block = new BlockToWrite(block);
       stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
     }
     
@@ -379,7 +378,7 @@ public class DFSOutputStream extends FSOutputSummer
         int bytesPerChecksum) throws IOException {
       isAppend = true;
       stage = BlockConstructionStage.PIPELINE_SETUP_APPEND;
-      block = lastBlock.getBlock();
+      block = new BlockToWrite(lastBlock.getBlock());
       bytesSent = block.getNumBytes();
       accessToken = lastBlock.getBlockToken();
       isLazyPersistFile = isLazyPersist(stat);
@@ -1082,7 +1081,7 @@ public class DFSOutputStream extends FSOutputSummer
         LocatedBlock lb;
         //get a new datanode
         lb = dfsClient.namenode.getAdditionalDatanode(
-            src, fileId, block, nodes, storageIDs,
+            src, fileId, block.getCurrentBlock(), nodes, storageIDs,
             exclude.toArray(new DatanodeInfo[exclude.size()]),
             1, dfsClient.clientName);
         // a new node was allocated by the namenode. Update nodes.
@@ -1260,7 +1259,8 @@ public class DFSOutputStream extends FSOutputSummer
         }
 
         // get a new generation stamp and an access token
-        LocatedBlock lb = dfsClient.namenode.updateBlockForPipeline(block, dfsClient.clientName);
+        LocatedBlock lb = dfsClient.namenode.
+            updateBlockForPipeline(block.getCurrentBlock(), dfsClient.clientName);
         newGS = lb.getBlock().getGenerationStamp();
         accessToken = lb.getBlockToken();
         
@@ -1308,16 +1308,21 @@ public class DFSOutputStream extends FSOutputSummer
 
       if (success) {
         // update pipeline at the namenode
-        ExtendedBlock newBlock = new ExtendedBlock(
-            block.getBlockPoolId(), block.getBlockId(), block.getNumBytes(), newGS);
-        dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
-            nodes, storageIDs);
-        // update client side generation stamp
-        block = newBlock;
+        final ExtendedBlock oldBlock = block.getCurrentBlock();
+        // the new GS has been propagated to all DN, it should be ok to update the
+        // local block state
+        block.setGenerationStamp(newGS);
+        dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
+            block.getCurrentBlock(), nodes, storageIDs);
       }
       return false; // do not sleep, continue processing
     }
 
+    DatanodeInfo[] getExcludedNodes() {
+      return excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
+          .keySet().toArray(new DatanodeInfo[0]);
+    }
+
     /**
      * Open a DataOutputStream to a DataNode so that it can be written to.
      * This happens when a file is created and each time a new block is allocated.
@@ -1330,20 +1335,17 @@ public class DFSOutputStream extends FSOutputSummer
       StorageType[] storageTypes = null;
       int count = dfsClient.getConf().nBlockWriteRetry;
       boolean success = false;
-      ExtendedBlock oldBlock = block;
+      ExtendedBlock oldBlock = block.getCurrentBlock();
       do {
         hasError = false;
         lastException.set(null);
         errorIndex = -1;
         success = false;
 
-        DatanodeInfo[] excluded =
-            excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
-            .keySet()
-            .toArray(new DatanodeInfo[0]);
-        block = oldBlock;
-        lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
-        block = lb.getBlock();
+        DatanodeInfo[] excluded = getExcludedNodes();
+        lb = locateFollowingBlock(
+            excluded.length > 0 ? excluded : null, oldBlock);
+        block.setCurrentBlock(lb.getBlock());
         block.setNumBytes(0);
         bytesSent = 0;
         accessToken = lb.getBlockToken();
@@ -1357,9 +1359,9 @@ public class DFSOutputStream extends FSOutputSummer
 
         if (!success) {
           DFSClient.LOG.info("Abandoning " + block);
-          dfsClient.namenode.abandonBlock(block, fileId, src,
-              dfsClient.clientName);
-          block = null;
+          dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
+              fileId, src, dfsClient.clientName);
+          block.setCurrentBlock(null);
           DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
           excludedNodes.put(nodes[errorIndex], nodes[errorIndex]);
         }
@@ -1421,7 +1423,7 @@ public class DFSOutputStream extends FSOutputSummer
 
           // We cannot change the block length in 'block' as it counts the number
           // of bytes ack'ed.
-          ExtendedBlock blockCopy = new ExtendedBlock(block);
+          ExtendedBlock blockCopy = block.getCurrentBlock();
           blockCopy.setNumBytes(blockSize);
 
           boolean[] targetPinnings = getPinnings(nodes, true);
@@ -1539,7 +1541,8 @@ public class DFSOutputStream extends FSOutputSummer
       }
     }
 
-    private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)  throws IOException {
+    protected LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
+        ExtendedBlock oldBlock) throws IOException {
       int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
       long sleeptime = 400;
       while (true) {
@@ -1547,7 +1550,7 @@ public class DFSOutputStream extends FSOutputSummer
         while (true) {
           try {
             return dfsClient.namenode.addBlock(src, dfsClient.clientName,
-                block, excludedNodes, fileId, favoredNodes);
+                oldBlock, excluded, fileId, favoredNodes);
           } catch (RemoteException e) {
             IOException ue = 
               e.unwrapRemoteException(FileNotFoundException.class,
@@ -1591,7 +1594,7 @@ public class DFSOutputStream extends FSOutputSummer
     }
 
     ExtendedBlock getBlock() {
-      return block;
+      return block.getCurrentBlock();
     }
 
     DatanodeInfo[] getNodes() {
@@ -1607,6 +1610,42 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
+  static class BlockToWrite {
+    private ExtendedBlock currentBlock;
+
+    BlockToWrite(ExtendedBlock block) {
+      setCurrentBlock(block);
+    }
+
+    synchronized ExtendedBlock getCurrentBlock() {
+      return currentBlock == null ? null : new ExtendedBlock(currentBlock);
+    }
+
+    synchronized long getNumBytes() {
+      return currentBlock == null ? 0 : currentBlock.getNumBytes();
+    }
+
+    synchronized void setCurrentBlock(ExtendedBlock block) {
+      currentBlock = (block == null || block.getLocalBlock() == null) ?
+          null : new ExtendedBlock(block);
+    }
+
+    synchronized void setNumBytes(long numBytes) {
+      assert currentBlock != null;
+      currentBlock.setNumBytes(numBytes);
+    }
+
+    synchronized void setGenerationStamp(long generationStamp) {
+      assert currentBlock != null;
+      currentBlock.setGenerationStamp(generationStamp);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return currentBlock == null ? "null" : currentBlock.toString();
+    }
+  }
+
   /**
    * Create a socket for a write pipeline
    * @param first the first datanode 
@@ -2169,8 +2208,11 @@ public class DFSOutputStream extends FSOutputSummer
       // update the block length first time irrespective of flag
       if (updateLength || persistBlocks.get()) {
         synchronized (this) {
-          if (streamer != null && streamer.block != null) {
-            lastBlockLength = streamer.block.getNumBytes();
+          if (streamer != null && !streamer.streamerClosed) {
+            final ExtendedBlock block = streamer.getBlock();
+            if (block != null) {
+              lastBlockLength = block.getNumBytes();
+            }
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0823fb72/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 8c46564..6173b7b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -81,8 +81,7 @@ public class TestDFSOutputStream {
    * packet size < 64kB. See HDFS-7308 for details.
    */
   @Test
-  public void testComputePacketChunkSize()
-      throws Exception {
+  public void testComputePacketChunkSize() throws Exception {
     DistributedFileSystem fs = cluster.getFileSystem();
     FSDataOutputStream os = fs.create(new Path("/test"));
     DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org