You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2016/01/13 23:29:34 UTC

[06/17] hadoop git commit: HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in commitBlock. Contributed by Chang Li.

HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in commitBlock. Contributed by Chang Li.

(cherry picked from commit 397b554c36867724ca4167931270cd7af784e54a)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/09d3e476
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/09d3e476
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/09d3e476

Branch: refs/heads/branch-2.7.2
Commit: 09d3e476dc410ee203109c7a0131b9cacd07b3ed
Parents: 89f6716
Author: Kihwal Lee <ki...@apache.org>
Authored: Wed Nov 4 12:10:59 2015 -0600
Committer: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vi...@apache.org>
Committed: Wed Jan 13 11:35:37 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSOutputStream.java |   2 +-
 .../BlockInfoContiguousUnderConstruction.java   |   2 +-
 .../server/blockmanagement/BlockManager.java    |   4 +
 .../org/apache/hadoop/hdfs/DFSTestUtil.java     |  69 +++++++++++++
 .../TestCommitBlockWithInvalidGenStamp.java     | 100 +++++++++++++++++++
 6 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index f4a24a0..b4b58ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1153,6 +1153,9 @@ Release 2.6.3 - UNRELEASED
     HDFS-9470. Encryption zone on root not loaded from fsimage after NN
     restart. (Xiao Chen via wang)
 
+    HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in
+    commitBlock. (Chang Li via kihwal)
+
 Release 2.6.2 - 2015-10-28
 
   INCOMPATIBLE CHANGES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index d6c239e..e8080ee 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -226,7 +226,7 @@ public class DFSOutputStream extends FSOutputSummer
   //
   class DataStreamer extends Daemon {
     private volatile boolean streamerClosed = false;
-    private ExtendedBlock block; // its length is number of bytes acked
+    private volatile ExtendedBlock block; // its length is number of bytes acked
     private Token<BlockTokenIdentifier> accessToken;
     private DataOutputStream blockStream;
     private DataInputStream blockReplyStream;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
index 92153ab..4f315c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
@@ -274,7 +274,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
       throw new IOException("Trying to commit inconsistent block: id = "
           + block.getBlockId() + ", expected id = " + getBlockId());
     blockUCState = BlockUCState.COMMITTED;
-    this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+    this.setNumBytes(block.getNumBytes());
     // Sort out invalid replicas.
     setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8dcfc4e..81367ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -596,6 +596,10 @@ public class BlockManager {
     assert block.getNumBytes() <= commitBlock.getNumBytes() :
       "commitBlock length is less than the stored one "
       + commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
+    if(block.getGenerationStamp() != commitBlock.getGenerationStamp()) {
+      throw new IOException("Commit block with mismatching GS. NN has " +
+        block + ", client submits " + commitBlock);
+    }
     block.commitBlock(commitBlock);
     return true;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 13a3ec3..d7e1d92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -73,12 +75,16 @@ import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
 import org.apache.hadoop.hdfs.server.namenode.ha
         .ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.nativeio.NativeIO;
@@ -1742,4 +1748,67 @@ public class DFSTestUtil {
     dn.setLastUpdate(Time.now() + offset);
     dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
   }
+
+  public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+      Block block, BlockStatus blockStatus, DatanodeStorage storage) {
+    ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
+    receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
+    StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
+    reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+    return reports;
+  }
+
+  /**
+   * Adds a block to a file.
+   * This method only manipulates NameNode
+   * states of the file and the block without injecting data to DataNode.
+   * It does mimic block reports.
+   * You should disable periodical heartbeat before use this.
+   * @param dataNodes List DataNodes to host the block
+   * @param previous Previous block in the file
+   * @param len block size
+   * @return The added block
+   */
+  public static Block addBlockToFile(
+      List<DataNode> dataNodes, DistributedFileSystem fs, FSNamesystem ns,
+      String file, INodeFile fileNode,
+      String clientName, ExtendedBlock previous, int len)
+      throws Exception {
+    fs.getClient().namenode.addBlock(file, clientName, previous, null,
+        fileNode.getId(), null);
+
+    final BlockInfoContiguous lastBlock =
+        fileNode.getLastBlock();
+    final int groupSize = fileNode.getBlockReplication();
+    assert dataNodes.size() >= groupSize;
+    // 1. RECEIVING_BLOCK IBR
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i, 0,
+          lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+      }
+    }
+
+    // 2. RECEIVED_BLOCK IBR
+    for (int i = 0; i < groupSize; i++) {
+      DataNode dn = dataNodes.get(i);
+      final Block block = new Block(lastBlock.getBlockId() + i,
+          len, lastBlock.getGenerationStamp());
+      DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+      StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+          .makeReportForReceivedBlock(block,
+              ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+      for (StorageReceivedDeletedBlocks report : reports) {
+        ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+      }
+    }
+    lastBlock.setNumBytes(len);
+    return lastBlock;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
new file mode 100644
index 0000000..30ea547
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestCommitBlockWithInvalidGenStamp {
+  private static final int BLOCK_SIZE = 1024;
+  private MiniDFSCluster cluster;
+  private FSDirectory dir;
+  private DistributedFileSystem dfs;
+
+  @Before
+  public void setUp() throws IOException {
+    final Configuration conf = new Configuration();
+    conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+    cluster = new MiniDFSCluster.Builder(conf).build();
+    cluster.waitActive();
+
+    dir = cluster.getNamesystem().getFSDirectory();
+    dfs = cluster.getFileSystem();
+  }
+
+  @After
+  public void tearDown() {
+    if (cluster != null) {
+      cluster.shutdown();
+    }
+  }
+
+  @Test
+  public void testCommitWithInvalidGenStamp() throws Exception {
+    final Path file = new Path("/file");
+    FSDataOutputStream out = null;
+
+    try {
+      out = dfs.create(file, (short) 1);
+      INodeFile fileNode = dir.getINode4Write(file.toString()).asFile();
+      ExtendedBlock previous = null;
+
+      Block newBlock = DFSTestUtil.addBlockToFile(cluster.getDataNodes(),
+          dfs, cluster.getNamesystem(), file.toString(), fileNode,
+          dfs.getClient().getClientName(), previous, 100);
+      Block newBlockClone = new Block(newBlock);
+      previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+          newBlockClone);
+
+      previous.setGenerationStamp(123);
+      try{
+        dfs.getClient().getNamenode().complete(file.toString(),
+            dfs.getClient().getClientName(), previous, fileNode.getId());
+        Assert.fail("should throw exception because invalid genStamp");
+      } catch (IOException e) {
+        Assert.assertTrue(e.toString().contains(
+            "Commit block with mismatching GS. NN has " +
+            newBlock + ", client submits " + newBlockClone));
+      }
+      previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+          newBlock);
+      boolean complete =  dfs.getClient().getNamenode().complete(file.toString(),
+      dfs.getClient().getClientName(), previous, fileNode.getId());
+      Assert.assertTrue("should complete successfully", complete);
+    } finally {
+      IOUtils.cleanup(null, out);
+    }
+  }
+}