You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2017/02/15 18:46:38 UTC

hadoop git commit: HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao.

Repository: hadoop
Updated Branches:
  refs/heads/trunk 0fc6f3837 -> 627da6f71


HDFS-8498. Blocks can be committed with wrong size. Contributed by Jing Zhao.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/627da6f7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/627da6f7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/627da6f7

Branch: refs/heads/trunk
Commit: 627da6f7178e18aa41996969c408b6f344e297d1
Parents: 0fc6f38
Author: Jing Zhao <ji...@apache.org>
Authored: Wed Feb 15 10:44:37 2017 -0800
Committer: Jing Zhao <ji...@apache.org>
Committed: Wed Feb 15 10:44:37 2017 -0800

----------------------------------------------------------------------
 .../org/apache/hadoop/hdfs/DataStreamer.java    | 100 +++++++++++++------
 .../apache/hadoop/hdfs/StripedDataStreamer.java |   8 +-
 .../apache/hadoop/hdfs/TestDFSOutputStream.java |   3 +-
 3 files changed, 72 insertions(+), 39 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/627da6f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
index 8e6eb63..0268537 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DataStreamer.java
@@ -142,8 +142,6 @@ class DataStreamer extends Daemon {
 
     /**
      * Record a connection exception.
-     * @param e
-     * @throws InvalidEncryptionKeyException
      */
     void recordFailure(final InvalidEncryptionKeyException e)
         throws InvalidEncryptionKeyException {
@@ -178,9 +176,8 @@ class DataStreamer extends Daemon {
         final StorageType[] targetStorageTypes,
         final Token<BlockTokenIdentifier> blockToken) throws IOException {
       //send the TRANSFER_BLOCK request
-      new Sender(out)
-          .transferBlock(block, blockToken, dfsClient.clientName, targets,
-              targetStorageTypes);
+      new Sender(out).transferBlock(block.getCurrentBlock(), blockToken,
+          dfsClient.clientName, targets, targetStorageTypes);
       out.flush();
       //ack
       BlockOpResponseProto transferResponse = BlockOpResponseProto
@@ -199,6 +196,42 @@ class DataStreamer extends Daemon {
     }
   }
 
+  static class BlockToWrite {
+    private ExtendedBlock currentBlock;
+
+    BlockToWrite(ExtendedBlock block) {
+      setCurrentBlock(block);
+    }
+
+    synchronized ExtendedBlock getCurrentBlock() {
+      return currentBlock == null ? null : new ExtendedBlock(currentBlock);
+    }
+
+    synchronized long getNumBytes() {
+      return currentBlock == null ? 0 : currentBlock.getNumBytes();
+    }
+
+    synchronized void setCurrentBlock(ExtendedBlock block) {
+      currentBlock = (block == null || block.getLocalBlock() == null) ?
+          null : new ExtendedBlock(block);
+    }
+
+    synchronized void setNumBytes(long numBytes) {
+      assert currentBlock != null;
+      currentBlock.setNumBytes(numBytes);
+    }
+
+    synchronized void setGenerationStamp(long generationStamp) {
+      assert currentBlock != null;
+      currentBlock.setGenerationStamp(generationStamp);
+    }
+
+    @Override
+    public synchronized String toString() {
+      return currentBlock == null ? "null" : currentBlock.toString();
+    }
+  }
+
   /**
    * Create a socket for a write pipeline
    *
@@ -440,7 +473,7 @@ class DataStreamer extends Daemon {
   }
 
   private volatile boolean streamerClosed = false;
-  protected volatile ExtendedBlock block; // its length is number of bytes acked
+  protected final BlockToWrite block; // its length is number of bytes acked
   protected Token<BlockTokenIdentifier> accessToken;
   private DataOutputStream blockStream;
   private DataInputStream blockReplyStream;
@@ -508,7 +541,7 @@ class DataStreamer extends Daemon {
                        ByteArrayManager byteArrayManage,
                        boolean isAppend, String[] favoredNodes,
                        EnumSet<AddBlockFlag> flags) {
-    this.block = block;
+    this.block = new BlockToWrite(block);
     this.dfsClient = dfsClient;
     this.src = src;
     this.progress = progress;
@@ -1322,7 +1355,7 @@ class DataStreamer extends Daemon {
       LocatedBlock lb;
       //get a new datanode
       lb = dfsClient.namenode.getAdditionalDatanode(
-          src, stat.getFileId(), block, nodes, storageIDs,
+          src, stat.getFileId(), block.getCurrentBlock(), nodes, storageIDs,
           exclude.toArray(new DatanodeInfo[exclude.size()]),
           1, dfsClient.clientName);
       // a new node was allocated by the namenode. Update nodes.
@@ -1440,7 +1473,7 @@ class DataStreamer extends Daemon {
     } // while
 
     if (success) {
-      block = updatePipeline(newGS);
+      updatePipeline(newGS);
     }
   }
 
@@ -1536,21 +1569,22 @@ class DataStreamer extends Daemon {
   }
 
   private LocatedBlock updateBlockForPipeline() throws IOException {
-    return dfsClient.namenode.updateBlockForPipeline(block,
+    return dfsClient.namenode.updateBlockForPipeline(block.getCurrentBlock(),
         dfsClient.clientName);
   }
 
-  static ExtendedBlock newBlock(ExtendedBlock b, final long newGS) {
-    return new ExtendedBlock(b.getBlockPoolId(), b.getBlockId(),
-        b.getNumBytes(), newGS);
+  void updateBlockGS(final long newGS) {
+    block.setGenerationStamp(newGS);
   }
 
   /** update pipeline at the namenode */
-  ExtendedBlock updatePipeline(long newGS) throws IOException {
-    final ExtendedBlock newBlock = newBlock(block, newGS);
-    dfsClient.namenode.updatePipeline(dfsClient.clientName, block, newBlock,
-        nodes, storageIDs);
-    return newBlock;
+  private void updatePipeline(long newGS) throws IOException {
+    final ExtendedBlock oldBlock = block.getCurrentBlock();
+    // the new GS has been propagated to all DN, it should be ok to update the
+    // local block state
+    updateBlockGS(newGS);
+    dfsClient.namenode.updatePipeline(dfsClient.clientName, oldBlock,
+        block.getCurrentBlock(), nodes, storageIDs);
   }
 
   DatanodeInfo[] getExcludedNodes() {
@@ -1570,31 +1604,29 @@ class DataStreamer extends Daemon {
     StorageType[] storageTypes;
     int count = dfsClient.getConf().getNumBlockWriteRetry();
     boolean success;
-    ExtendedBlock oldBlock = block;
+    final ExtendedBlock oldBlock = block.getCurrentBlock();
     do {
       errorState.resetInternalError();
       lastException.clear();
 
       DatanodeInfo[] excluded = getExcludedNodes();
-      block = oldBlock;
-      lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
-      block = lb.getBlock();
+      lb = locateFollowingBlock(
+          excluded.length > 0 ? excluded : null, oldBlock);
+      block.setCurrentBlock(lb.getBlock());
       block.setNumBytes(0);
       bytesSent = 0;
       accessToken = lb.getBlockToken();
       nodes = lb.getLocations();
       storageTypes = lb.getStorageTypes();
 
-      //
       // Connect to first DataNode in the list.
-      //
       success = createBlockOutputStream(nodes, storageTypes, 0L, false);
 
       if (!success) {
         LOG.warn("Abandoning " + block);
-        dfsClient.namenode.abandonBlock(block, stat.getFileId(), src,
-            dfsClient.clientName);
-        block = null;
+        dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
+            stat.getFileId(), src, dfsClient.clientName);
+        block.setCurrentBlock(null);
         final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
         LOG.warn("Excluding datanode " + badNode);
         excludedNodes.put(badNode, badNode);
@@ -1655,7 +1687,7 @@ class DataStreamer extends Daemon {
 
         // We cannot change the block length in 'block' as it counts the number
         // of bytes ack'ed.
-        ExtendedBlock blockCopy = new ExtendedBlock(block);
+        ExtendedBlock blockCopy = block.getCurrentBlock();
         blockCopy.setNumBytes(stat.getBlockSize());
 
         boolean[] targetPinnings = getPinnings(nodes);
@@ -1765,9 +1797,9 @@ class DataStreamer extends Daemon {
     }
   }
 
-  private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes)
-      throws IOException {
-    return DFSOutputStream.addBlock(excludedNodes, dfsClient, src, block,
+  private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
+      ExtendedBlock oldBlock) throws IOException {
+    return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
         stat.getFileId(), favoredNodes, addBlockFlags);
   }
 
@@ -1811,7 +1843,7 @@ class DataStreamer extends Daemon {
    * @return the block this streamer is writing to
    */
   ExtendedBlock getBlock() {
-    return block;
+    return block.getCurrentBlock();
   }
 
   /**
@@ -2016,6 +2048,8 @@ class DataStreamer extends Daemon {
 
   @Override
   public String toString() {
-    return block == null? "block==null": "" + block.getLocalBlock();
+    final ExtendedBlock extendedBlock = block.getCurrentBlock();
+    return extendedBlock == null ?
+        "block==null" : "" + extendedBlock.getLocalBlock();
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/627da6f7/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
index 89ab6a3..b457edb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/StripedDataStreamer.java
@@ -71,7 +71,7 @@ public class StripedDataStreamer extends DataStreamer {
 
   @Override
   protected void endBlock() {
-    coordinator.offerEndBlock(index, block);
+    coordinator.offerEndBlock(index, block.getCurrentBlock());
     super.endBlock();
   }
 
@@ -93,7 +93,7 @@ public class StripedDataStreamer extends DataStreamer {
   protected LocatedBlock nextBlockOutputStream() throws IOException {
     boolean success;
     LocatedBlock lb = getFollowingBlock();
-    block = lb.getBlock();
+    block.setCurrentBlock(lb.getBlock());
     block.setNumBytes(0);
     bytesSent = 0;
     accessToken = lb.getBlockToken();
@@ -105,7 +105,7 @@ public class StripedDataStreamer extends DataStreamer {
     success = createBlockOutputStream(nodes, storageTypes, 0L, false);
 
     if (!success) {
-      block = null;
+      block.setCurrentBlock(null);
       final DatanodeInfo badNode = nodes[getErrorState().getBadNodeIndex()];
       LOG.warn("Excluding datanode " + badNode);
       excludedNodes.put(badNode, badNode);
@@ -161,7 +161,7 @@ public class StripedDataStreamer extends DataStreamer {
         success = coordinator.takeStreamerUpdateResult(index);
         if (success) {
           // if all succeeded, update its block using the new GS
-          block = newBlock(block, newGS);
+          updateBlockGS(newGS);
         } else {
           // otherwise close the block stream and restart the recovery process
           closeStream();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/627da6f7/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 750103d..9ec01b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -110,8 +110,7 @@ public class TestDFSOutputStream {
    * packet size < 64kB. See HDFS-7308 for details.
    */
   @Test
-  public void testComputePacketChunkSize()
-      throws Exception {
+  public void testComputePacketChunkSize() throws Exception {
     DistributedFileSystem fs = cluster.getFileSystem();
     FSDataOutputStream os = fs.create(new Path("/test"));
     DFSOutputStream dos = (DFSOutputStream) Whitebox.getInternalState(os,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org