You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ji...@apache.org on 2014/05/13 20:22:33 UTC
svn commit: r1594314 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/
Author: jing9
Date: Tue May 13 18:22:33 2014
New Revision: 1594314
URL: http://svn.apache.org/r1594314
Log:
HDFS-6186. Pause deletion of blocks when the namenode starts up. Contributed by Jing Zhao.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1594314&r1=1594313&r2=1594314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue May 13 18:22:33 2014
@@ -354,6 +354,8 @@ Release 2.5.0 - UNRELEASED
HDFS-6230. Expose upgrade status through NameNode web UI.
(Mit Desai via wheat9)
+ HDFS-6186. Pause deletion of blocks when the namenode starts up. (jing9)
+
OPTIMIZATIONS
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java?rev=1594314&r1=1594313&r2=1594314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java Tue May 13 18:22:33 2014
@@ -245,6 +245,10 @@ public class DFSConfigKeys extends Commo
"dfs.namenode.path.based.cache.refresh.interval.ms";
public static final long DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT = 30000L;
+ /** Pending period of block deletion since NameNode startup */
+ public static final String DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY = "dfs.namenode.startup.delay.block.deletion.ms";
+ public static final long DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT = 0L;
+
// Whether to enable datanode's stale state detection and usage for reads
public static final String DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_KEY = "dfs.namenode.avoid.read.stale.datanode";
public static final boolean DFS_NAMENODE_AVOID_STALE_DATANODE_FOR_READ_DEFAULT = false;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1594314&r1=1594313&r2=1594314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue May 13 18:22:33 2014
@@ -261,7 +261,11 @@ public class BlockManager {
this.namesystem = namesystem;
datanodeManager = new DatanodeManager(this, namesystem, conf);
heartbeatManager = datanodeManager.getHeartbeatManager();
- invalidateBlocks = new InvalidateBlocks(datanodeManager);
+
+ final long pendingPeriod = conf.getLong(
+ DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY,
+ DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_DEFAULT);
+ invalidateBlocks = new InvalidateBlocks(datanodeManager, pendingPeriod);
// Compute the map capacity by allocating 2% of total memory
blocksMap = new BlocksMap(
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java?rev=1594314&r1=1594313&r2=1594314&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/InvalidateBlocks.java Tue May 13 18:22:33 2014
@@ -18,16 +18,24 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.PrintWriter;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.GregorianCalendar;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
+import org.apache.commons.logging.Log;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.Time;
+
+import com.google.common.annotations.VisibleForTesting;
/**
* Keeps a Collection for every named machine containing blocks
@@ -44,8 +52,28 @@ class InvalidateBlocks {
private final DatanodeManager datanodeManager;
- InvalidateBlocks(final DatanodeManager datanodeManager) {
+ /**
+ * The period of pending time for block invalidation since the NameNode
+ * startup
+ */
+ private final long pendingPeriodInMs;
+ /** the startup time */
+ private final long startupTime = Time.monotonicNow();
+
+ InvalidateBlocks(final DatanodeManager datanodeManager, long pendingPeriodInMs) {
this.datanodeManager = datanodeManager;
+ this.pendingPeriodInMs = pendingPeriodInMs;
+ printBlockDeletionTime(BlockManager.LOG);
+ }
+
+ private void printBlockDeletionTime(final Log log) {
+ log.info(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY
+ + " is set to " + pendingPeriodInMs + " ms.");
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy MMM dd HH:mm:ss");
+ Calendar calendar = new GregorianCalendar();
+ calendar.add(Calendar.SECOND, (int) (this.pendingPeriodInMs / 1000));
+ log.info("The block deletion will start around "
+ + sdf.format(calendar.getTime()));
}
/** @return the number of blocks to be invalidated . */
@@ -134,8 +162,25 @@ class InvalidateBlocks {
return new ArrayList<String>(node2blocks.keySet());
}
+ /**
+ * @return the remianing pending time
+ */
+ @VisibleForTesting
+ long getInvalidationDelay() {
+ return pendingPeriodInMs - (Time.monotonicNow() - startupTime);
+ }
+
synchronized List<Block> invalidateWork(
final String storageId, final DatanodeDescriptor dn) {
+ final long delay = getInvalidationDelay();
+ if (delay > 0) {
+ if (BlockManager.LOG.isDebugEnabled()) {
+ BlockManager.LOG
+ .debug("Block deletion is delayed during NameNode startup. "
+ + "The deletion will start after " + delay + " ms.");
+ }
+ return null;
+ }
final LightWeightHashSet<Block> set = node2blocks.get(storageId);
if (set == null) {
return null;
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java?rev=1594314&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestPendingInvalidateBlock.java Tue May 13 18:22:33 2014
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import org.apache.commons.logging.impl.Log4JLogger;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.log4j.Level;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+import org.mockito.internal.util.reflection.Whitebox;
+
+/**
+ * Test if we can correctly delay the deletion of blocks.
+ */
+public class TestPendingInvalidateBlock {
+ {
+ ((Log4JLogger)BlockManager.LOG).getLogger().setLevel(Level.DEBUG);
+ }
+
+ private static final int BLOCKSIZE = 1024;
+ private static final short REPLICATION = 2;
+
+ private Configuration conf;
+ private MiniDFSCluster cluster;
+ private DistributedFileSystem dfs;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new Configuration();
+ conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, BLOCKSIZE);
+ // block deletion pending period
+ conf.setLong(DFSConfigKeys.DFS_NAMENODE_STARTUP_DELAY_BLOCK_DELETION_MS_KEY, 1000 * 5);
+ // set the block report interval to 2s
+ conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 2000);
+ conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
+ // disable the RPC timeout for debug
+ conf.setLong(CommonConfigurationKeys.IPC_PING_INTERVAL_KEY, 0);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(REPLICATION)
+ .build();
+ cluster.waitActive();
+ dfs = cluster.getFileSystem();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ @Test
+ public void testPendingDeletion() throws Exception {
+ final Path foo = new Path("/foo");
+ DFSTestUtil.createFile(dfs, foo, BLOCKSIZE, REPLICATION, 0);
+ // restart NN
+ cluster.restartNameNode(true);
+ dfs.delete(foo, true);
+ Assert.assertEquals(0, cluster.getNamesystem().getBlocksTotal());
+ Assert.assertEquals(REPLICATION, cluster.getNamesystem()
+ .getPendingDeletionBlocks());
+ Thread.sleep(6000);
+ Assert.assertEquals(0, cluster.getNamesystem().getBlocksTotal());
+ Assert.assertEquals(0, cluster.getNamesystem().getPendingDeletionBlocks());
+ }
+
+ /**
+ * Test whether we can delay the deletion of unknown blocks in DataNode's
+ * first several block reports.
+ */
+ @Test
+ public void testPendingDeleteUnknownBlocks() throws Exception {
+ final int fileNum = 5; // 5 files
+ final Path[] files = new Path[fileNum];
+ final DataNodeProperties[] dnprops = new DataNodeProperties[REPLICATION];
+ // create a group of files, each file contains 1 block
+ for (int i = 0; i < fileNum; i++) {
+ files[i] = new Path("/file" + i);
+ DFSTestUtil.createFile(dfs, files[i], BLOCKSIZE, REPLICATION, i);
+ }
+ // wait until all DataNodes have replicas
+ waitForReplication();
+ for (int i = REPLICATION - 1; i >= 0; i--) {
+ dnprops[i] = cluster.stopDataNode(i);
+ }
+ Thread.sleep(2000);
+ // delete 2 files, we still have 3 files remaining so that we can cover
+ // every DN storage
+ for (int i = 0; i < 2; i++) {
+ dfs.delete(files[i], true);
+ }
+
+ // restart NameNode
+ cluster.restartNameNode(false);
+ InvalidateBlocks invalidateBlocks = (InvalidateBlocks) Whitebox
+ .getInternalState(cluster.getNamesystem().getBlockManager(),
+ "invalidateBlocks");
+ InvalidateBlocks mockIb = Mockito.spy(invalidateBlocks);
+ Mockito.doReturn(1L).when(mockIb).getInvalidationDelay();
+ Whitebox.setInternalState(cluster.getNamesystem().getBlockManager(),
+ "invalidateBlocks", mockIb);
+
+ Assert.assertEquals(0L, cluster.getNamesystem().getPendingDeletionBlocks());
+ // restart DataNodes
+ for (int i = 0; i < REPLICATION; i++) {
+ cluster.restartDataNode(dnprops[i], true);
+ }
+ cluster.waitActive();
+
+ for (int i = 0; i < REPLICATION; i++) {
+ DataNodeTestUtils.triggerBlockReport(cluster.getDataNodes().get(i));
+ }
+ Thread.sleep(2000);
+ // make sure we have received block reports by checking the total block #
+ Assert.assertEquals(3, cluster.getNamesystem().getBlocksTotal());
+ Assert.assertEquals(4, cluster.getNamesystem().getPendingDeletionBlocks());
+
+ cluster.restartNameNode(true);
+ Thread.sleep(6000);
+ Assert.assertEquals(3, cluster.getNamesystem().getBlocksTotal());
+ Assert.assertEquals(0, cluster.getNamesystem().getPendingDeletionBlocks());
+ }
+
+ private long waitForReplication() throws Exception {
+ for (int i = 0; i < 10; i++) {
+ long ur = cluster.getNamesystem().getUnderReplicatedBlocks();
+ if (ur == 0) {
+ return 0;
+ } else {
+ Thread.sleep(1000);
+ }
+ }
+ return cluster.getNamesystem().getUnderReplicatedBlocks();
+ }
+
+}