You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2016/01/13 23:29:34 UTC
[06/17] hadoop git commit: HDFS-9289. Make DataStreamer#block thread
safe and verify genStamp in commitBlock. Contributed by Chang Li.
HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in commitBlock. Contributed by Chang Li.
(cherry picked from commit 397b554c36867724ca4167931270cd7af784e54a)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/09d3e476
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/09d3e476
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/09d3e476
Branch: refs/heads/branch-2.7.2
Commit: 09d3e476dc410ee203109c7a0131b9cacd07b3ed
Parents: 89f6716
Author: Kihwal Lee <ki...@apache.org>
Authored: Wed Nov 4 12:10:59 2015 -0600
Committer: Vinod Kumar Vavilapalli (I am also known as @tshooter.) <vi...@apache.org>
Committed: Wed Jan 13 11:35:37 2016 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 2 +-
.../BlockInfoContiguousUnderConstruction.java | 2 +-
.../server/blockmanagement/BlockManager.java | 4 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 69 +++++++++++++
.../TestCommitBlockWithInvalidGenStamp.java | 100 +++++++++++++++++++
6 files changed, 178 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index f4a24a0..b4b58ae 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -1153,6 +1153,9 @@ Release 2.6.3 - UNRELEASED
HDFS-9470. Encryption zone on root not loaded from fsimage after NN
restart. (Xiao Chen via wang)
+ HDFS-9289. Make DataStreamer#block thread safe and verify genStamp in
+ commitBlock. (Chang Li via kihwal)
+
Release 2.6.2 - 2015-10-28
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index d6c239e..e8080ee 100755
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -226,7 +226,7 @@ public class DFSOutputStream extends FSOutputSummer
//
class DataStreamer extends Daemon {
private volatile boolean streamerClosed = false;
- private ExtendedBlock block; // its length is number of bytes acked
+ private volatile ExtendedBlock block; // its length is number of bytes acked
private Token<BlockTokenIdentifier> accessToken;
private DataOutputStream blockStream;
private DataInputStream blockReplyStream;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
index 92153ab..4f315c7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java
@@ -274,7 +274,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
throw new IOException("Trying to commit inconsistent block: id = "
+ block.getBlockId() + ", expected id = " + getBlockId());
blockUCState = BlockUCState.COMMITTED;
- this.set(getBlockId(), block.getNumBytes(), block.getGenerationStamp());
+ this.setNumBytes(block.getNumBytes());
// Sort out invalid replicas.
setGenerationStampAndVerifyReplicas(block.getGenerationStamp());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 8dcfc4e..81367ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -596,6 +596,10 @@ public class BlockManager {
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
+ if(block.getGenerationStamp() != commitBlock.getGenerationStamp()) {
+ throw new IOException("Commit block with mismatching GS. NN has " +
+ block + ", client submits " + commitBlock);
+ }
block.commitBlock(commitBlock);
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index 13a3ec3..d7e1d92 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -64,6 +64,8 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
@@ -73,12 +75,16 @@ import org.apache.hadoop.hdfs.server.datanode.TestTransferRbw;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.namenode.FSEditLog;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.INodeFile;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.ha
.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
+import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
+import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
@@ -1742,4 +1748,67 @@ public class DFSTestUtil {
dn.setLastUpdate(Time.now() + offset);
dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
}
+
+ public static StorageReceivedDeletedBlocks[] makeReportForReceivedBlock(
+ Block block, BlockStatus blockStatus, DatanodeStorage storage) {
+ ReceivedDeletedBlockInfo[] receivedBlocks = new ReceivedDeletedBlockInfo[1];
+ receivedBlocks[0] = new ReceivedDeletedBlockInfo(block, blockStatus, null);
+ StorageReceivedDeletedBlocks[] reports = new StorageReceivedDeletedBlocks[1];
+ reports[0] = new StorageReceivedDeletedBlocks(storage, receivedBlocks);
+ return reports;
+ }
+
+ /**
+ * Adds a block to a file.
+ * This method only manipulates NameNode
+ * states of the file and the block without injecting data to DataNode.
+ * It does mimic block reports.
+ * You should disable periodical heartbeat before use this.
+ * @param dataNodes List DataNodes to host the block
+ * @param previous Previous block in the file
+ * @param len block size
+ * @return The added block
+ */
+ public static Block addBlockToFile(
+ List<DataNode> dataNodes, DistributedFileSystem fs, FSNamesystem ns,
+ String file, INodeFile fileNode,
+ String clientName, ExtendedBlock previous, int len)
+ throws Exception {
+ fs.getClient().namenode.addBlock(file, clientName, previous, null,
+ fileNode.getId(), null);
+
+ final BlockInfoContiguous lastBlock =
+ fileNode.getLastBlock();
+ final int groupSize = fileNode.getBlockReplication();
+ assert dataNodes.size() >= groupSize;
+ // 1. RECEIVING_BLOCK IBR
+ for (int i = 0; i < groupSize; i++) {
+ DataNode dn = dataNodes.get(i);
+ final Block block = new Block(lastBlock.getBlockId() + i, 0,
+ lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVING_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+
+ // 2. RECEIVED_BLOCK IBR
+ for (int i = 0; i < groupSize; i++) {
+ DataNode dn = dataNodes.get(i);
+ final Block block = new Block(lastBlock.getBlockId() + i,
+ len, lastBlock.getGenerationStamp());
+ DatanodeStorage storage = new DatanodeStorage(UUID.randomUUID().toString());
+ StorageReceivedDeletedBlocks[] reports = DFSTestUtil
+ .makeReportForReceivedBlock(block,
+ ReceivedDeletedBlockInfo.BlockStatus.RECEIVED_BLOCK, storage);
+ for (StorageReceivedDeletedBlocks report : reports) {
+ ns.processIncrementalBlockReport(dn.getDatanodeId(), report);
+ }
+ }
+ lastBlock.setNumBytes(len);
+ return lastBlock;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/09d3e476/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
new file mode 100644
index 0000000..30ea547
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockWithInvalidGenStamp.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.io.IOUtils;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+
+public class TestCommitBlockWithInvalidGenStamp {
+ private static final int BLOCK_SIZE = 1024;
+ private MiniDFSCluster cluster;
+ private FSDirectory dir;
+ private DistributedFileSystem dfs;
+
+ @Before
+ public void setUp() throws IOException {
+ final Configuration conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ cluster = new MiniDFSCluster.Builder(conf).build();
+ cluster.waitActive();
+
+ dir = cluster.getNamesystem().getFSDirectory();
+ dfs = cluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testCommitWithInvalidGenStamp() throws Exception {
+ final Path file = new Path("/file");
+ FSDataOutputStream out = null;
+
+ try {
+ out = dfs.create(file, (short) 1);
+ INodeFile fileNode = dir.getINode4Write(file.toString()).asFile();
+ ExtendedBlock previous = null;
+
+ Block newBlock = DFSTestUtil.addBlockToFile(cluster.getDataNodes(),
+ dfs, cluster.getNamesystem(), file.toString(), fileNode,
+ dfs.getClient().getClientName(), previous, 100);
+ Block newBlockClone = new Block(newBlock);
+ previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+ newBlockClone);
+
+ previous.setGenerationStamp(123);
+ try{
+ dfs.getClient().getNamenode().complete(file.toString(),
+ dfs.getClient().getClientName(), previous, fileNode.getId());
+ Assert.fail("should throw exception because invalid genStamp");
+ } catch (IOException e) {
+ Assert.assertTrue(e.toString().contains(
+ "Commit block with mismatching GS. NN has " +
+ newBlock + ", client submits " + newBlockClone));
+ }
+ previous = new ExtendedBlock(cluster.getNamesystem().getBlockPoolId(),
+ newBlock);
+ boolean complete = dfs.getClient().getNamenode().complete(file.toString(),
+ dfs.getClient().getClientName(), previous, fileNode.getId());
+ Assert.assertTrue("should complete successfully", complete);
+ } finally {
+ IOUtils.cleanup(null, out);
+ }
+ }
+}