You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/02/14 00:08:03 UTC
[1/3] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use
O(1) memory and use multiple threads (cmccabe)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 ebf649949 -> a8c6a96f3
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
deleted file mode 100644
index 9e78c10..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeBlockScanner.java
+++ /dev/null
@@ -1,551 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.net.InetSocketAddress;
-import java.net.URL;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Random;
-import java.util.Set;
-import java.util.concurrent.TimeoutException;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.DataNode;
-import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
-import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
-import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Time;
-import org.apache.log4j.Level;
-import org.junit.Test;
-
-/**
- * This test verifies that block verification occurs on the datanode
- */
-public class TestDatanodeBlockScanner {
-
- private static final Log LOG =
- LogFactory.getLog(TestDatanodeBlockScanner.class);
-
- private static final long TIMEOUT = 20000; // 20 sec.
-
- private static final Pattern pattern =
- Pattern.compile(".*?(blk_[-]*\\d+).*?scan time\\s*:\\s*(\\d+)");
-
- private static final Pattern pattern_blockVerify =
- Pattern.compile(".*?(SCAN_PERIOD)\\s*:\\s*(\\d+.*?)");
-
- static {
- ((Log4JLogger)FSNamesystem.auditLog).getLogger().setLevel(Level.WARN);
- }
- /**
- * This connects to datanode and fetches block verification data.
- * It repeats this until the given block has a verification time > newTime.
- * @param newTime - validation timestamps before newTime are "old", the
- * result of previous validations. This method waits until a "new"
- * validation timestamp is obtained. If no validator runs soon
- * enough, the method will time out.
- * @return - the new validation timestamp
- * @throws IOException
- * @throws TimeoutException
- */
- private static long waitForVerification(int infoPort, FileSystem fs,
- Path file, int blocksValidated,
- long newTime, long timeout)
- throws IOException, TimeoutException {
- URL url = new URL("http://localhost:" + infoPort +
- "/blockScannerReport?listblocks");
- long lastWarnTime = Time.monotonicNow();
- if (newTime <= 0) newTime = 1L;
- long verificationTime = 0;
-
- String block = DFSTestUtil.getFirstBlock(fs, file).getBlockName();
- long failtime = (timeout <= 0) ? Long.MAX_VALUE
- : Time.monotonicNow() + timeout;
- while (verificationTime < newTime) {
- if (failtime < Time.monotonicNow()) {
- throw new TimeoutException("failed to achieve block verification after "
- + timeout + " msec. Current verification timestamp = "
- + verificationTime + ", requested verification time > "
- + newTime);
- }
- String response = DFSTestUtil.urlGet(url);
- if(blocksValidated >= 0) {
- for(Matcher matcher = pattern_blockVerify.matcher(response); matcher.find();) {
- if (block.equals(matcher.group(1))) {
- assertEquals(1, blocksValidated);
- break;
- }
- }
- }
- for(Matcher matcher = pattern.matcher(response); matcher.find();) {
- if (block.equals(matcher.group(1))) {
- verificationTime = Long.parseLong(matcher.group(2));
- break;
- }
- }
-
- if (verificationTime < newTime) {
- long now = Time.monotonicNow();
- if ((now - lastWarnTime) >= 5*1000) {
- LOG.info("Waiting for verification of " + block);
- lastWarnTime = now;
- }
- try {
- Thread.sleep(500);
- } catch (InterruptedException ignored) {}
- }
- }
-
- return verificationTime;
- }
-
- @Test
- public void testDatanodeBlockScanner() throws IOException, TimeoutException {
- long startTime = Time.monotonicNow();
-
- Configuration conf = new HdfsConfiguration();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build();
- cluster.waitActive();
-
- FileSystem fs = cluster.getFileSystem();
- Path file1 = new Path("/tmp/testBlockVerification/file1");
- Path file2 = new Path("/tmp/testBlockVerification/file2");
-
- /*
- * Write the first file and restart the cluster.
- */
- DFSTestUtil.createFile(fs, file1, 10, (short)1, 0);
- cluster.shutdown();
-
- cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(1)
- .format(false).build();
- cluster.waitActive();
-
- DFSClient dfsClient = new DFSClient(new InetSocketAddress("localhost",
- cluster.getNameNodePort()), conf);
- fs = cluster.getFileSystem();
- DatanodeInfo dn = dfsClient.datanodeReport(DatanodeReportType.LIVE)[0];
-
- /*
- * The cluster restarted. The block should be verified by now.
- */
- assertTrue(waitForVerification(dn.getInfoPort(), fs, file1, 1, startTime,
- TIMEOUT) >= startTime);
-
- /*
- * Create a new file and read the block. The block should be marked
- * verified since the client reads the block and verifies checksum.
- */
- DFSTestUtil.createFile(fs, file2, 10, (short)1, 0);
- IOUtils.copyBytes(fs.open(file2), new IOUtils.NullOutputStream(),
- conf, true);
- assertTrue(waitForVerification(dn.getInfoPort(), fs, file2, 2, startTime,
- TIMEOUT) >= startTime);
-
- cluster.shutdown();
- }
-
- @Test
- public void testBlockCorruptionPolicy() throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
- Random random = new Random();
- FileSystem fs = null;
- int rand = random.nextInt(3);
-
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
- cluster.waitActive();
- fs = cluster.getFileSystem();
- Path file1 = new Path("/tmp/testBlockVerification/file1");
- DFSTestUtil.createFile(fs, file1, 1024, (short)3, 0);
- ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
-
- DFSTestUtil.waitReplication(fs, file1, (short)3);
- assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-
- // Corrupt random replica of block
- assertTrue(cluster.corruptReplica(rand, block));
-
- // Restart the datanode hoping the corrupt block to be reported
- cluster.restartDataNode(rand);
-
- // We have 2 good replicas and block is not corrupt
- DFSTestUtil.waitReplication(fs, file1, (short)2);
- assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-
- // Corrupt all replicas. Now, block should be marked as corrupt
- // and we should get all the replicas
- assertTrue(cluster.corruptReplica(0, block));
- assertTrue(cluster.corruptReplica(1, block));
- assertTrue(cluster.corruptReplica(2, block));
-
- // Trigger each of the DNs to scan this block immediately.
- // The block pool scanner doesn't run frequently enough on its own
- // to notice these, and due to HDFS-1371, the client won't report
- // bad blocks to the NN when all replicas are bad.
- for (DataNode dn : cluster.getDataNodes()) {
- DataNodeTestUtils.runBlockScannerForBlock(dn, block);
- }
-
- // We now have the blocks to be marked as corrupt and we get back all
- // its replicas
- DFSTestUtil.waitReplication(fs, file1, (short)3);
- assertTrue(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
- cluster.shutdown();
- }
-
- /**
- * testBlockCorruptionRecoveryPolicy.
- * This tests recovery of corrupt replicas, first for one corrupt replica
- * then for two. The test invokes blockCorruptionRecoveryPolicy which
- * 1. Creates a block with desired number of replicas
- * 2. Corrupts the desired number of replicas and restarts the datanodes
- * containing the corrupt replica. Additionaly we also read the block
- * in case restarting does not report corrupt replicas.
- * Restarting or reading from the datanode would trigger reportBadBlocks
- * to namenode.
- * NameNode adds it to corruptReplicasMap and neededReplication
- * 3. Test waits until all corrupt replicas are reported, meanwhile
- * Re-replciation brings the block back to healthy state
- * 4. Test again waits until the block is reported with expected number
- * of good replicas.
- */
- @Test
- public void testBlockCorruptionRecoveryPolicy1() throws Exception {
- // Test recovery of 1 corrupt replica
- LOG.info("Testing corrupt replica recovery for one corrupt replica");
- blockCorruptionRecoveryPolicy(4, (short)3, 1);
- }
-
- @Test
- public void testBlockCorruptionRecoveryPolicy2() throws Exception {
- // Test recovery of 2 corrupt replicas
- LOG.info("Testing corrupt replica recovery for two corrupt replicas");
- blockCorruptionRecoveryPolicy(5, (short)3, 2);
- }
-
- private void blockCorruptionRecoveryPolicy(int numDataNodes,
- short numReplicas,
- int numCorruptReplicas)
- throws Exception {
- Configuration conf = new HdfsConfiguration();
- conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 30L);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY, 5L);
-
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numDataNodes).build();
- cluster.waitActive();
- FileSystem fs = cluster.getFileSystem();
- Path file1 = new Path("/tmp/testBlockCorruptRecovery/file");
- DFSTestUtil.createFile(fs, file1, 1024, numReplicas, 0);
- ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, file1);
- final int ITERATIONS = 10;
-
- // Wait until block is replicated to numReplicas
- DFSTestUtil.waitReplication(fs, file1, numReplicas);
-
- for (int k = 0; ; k++) {
- // Corrupt numCorruptReplicas replicas of block
- int[] corruptReplicasDNIDs = new int[numCorruptReplicas];
- for (int i=0, j=0; (j != numCorruptReplicas) && (i < numDataNodes); i++) {
- if (cluster.corruptReplica(i, block)) {
- corruptReplicasDNIDs[j++] = i;
- LOG.info("successfully corrupted block " + block + " on node "
- + i + " " + cluster.getDataNodes().get(i).getDisplayName());
- }
- }
-
- // Restart the datanodes containing corrupt replicas
- // so they would be reported to namenode and re-replicated
- // They MUST be restarted in reverse order from highest to lowest index,
- // because the act of restarting them removes them from the ArrayList
- // and causes the indexes of all nodes above them in the list to change.
- for (int i = numCorruptReplicas - 1; i >= 0 ; i--) {
- LOG.info("restarting node with corrupt replica: position "
- + i + " node " + corruptReplicasDNIDs[i] + " "
- + cluster.getDataNodes().get(corruptReplicasDNIDs[i]).getDisplayName());
- cluster.restartDataNode(corruptReplicasDNIDs[i]);
- }
-
- // Loop until all corrupt replicas are reported
- try {
- DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
- block, numCorruptReplicas);
- } catch(TimeoutException e) {
- if (k > ITERATIONS) {
- throw e;
- }
- LOG.info("Timed out waiting for corrupt replicas, trying again, iteration " + k);
- continue;
- }
- break;
- }
-
- // Loop until the block recovers after replication
- DFSTestUtil.waitReplication(fs, file1, numReplicas);
- assertFalse(DFSTestUtil.allBlockReplicasCorrupt(cluster, file1, 0));
-
- // Make sure the corrupt replica is invalidated and removed from
- // corruptReplicasMap
- DFSTestUtil.waitCorruptReplicas(fs, cluster.getNamesystem(), file1,
- block, 0);
- cluster.shutdown();
- }
-
- /** Test if NameNode handles truncated blocks in block report */
- @Test
- public void testTruncatedBlockReport() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- final short REPLICATION_FACTOR = (short)2;
- final Path fileName = new Path("/file1");
-
- conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 3L);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 3);
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 3L);
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, false);
-
- long startTime = Time.monotonicNow();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION_FACTOR)
- .build();
- cluster.waitActive();
-
- ExtendedBlock block;
- try {
- FileSystem fs = cluster.getFileSystem();
- DFSTestUtil.createFile(fs, fileName, 1, REPLICATION_FACTOR, 0);
- DFSTestUtil.waitReplication(fs, fileName, REPLICATION_FACTOR);
- block = DFSTestUtil.getFirstBlock(fs, fileName);
- } finally {
- cluster.shutdown();
- }
-
- // Restart cluster and confirm block is verified on datanode 0,
- // then truncate it on datanode 0.
- cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION_FACTOR)
- .format(false)
- .build();
- cluster.waitActive();
- try {
- FileSystem fs = cluster.getFileSystem();
- int infoPort = cluster.getDataNodes().get(0).getInfoPort();
- assertTrue(waitForVerification(infoPort, fs, fileName, 1, startTime, TIMEOUT) >= startTime);
-
- // Truncate replica of block
- if (!changeReplicaLength(cluster, block, 0, -1)) {
- throw new IOException(
- "failed to find or change length of replica on node 0 "
- + cluster.getDataNodes().get(0).getDisplayName());
- }
- } finally {
- cluster.shutdown();
- }
-
- // Restart the cluster, add a node, and check that the truncated block is
- // handled correctly
- cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(REPLICATION_FACTOR)
- .format(false)
- .build();
- cluster.startDataNodes(conf, 1, true, null, null);
- cluster.waitActive(); // now we have 3 datanodes
-
- // Assure the cluster has left safe mode.
- cluster.waitClusterUp();
- assertFalse("failed to leave safe mode",
- cluster.getNameNode().isInSafeMode());
-
- try {
- // wait for truncated block be detected by block scanner,
- // and the block to be replicated
- DFSTestUtil.waitReplication(
- cluster.getFileSystem(), fileName, REPLICATION_FACTOR);
-
- // Make sure that truncated block will be deleted
- waitForBlockDeleted(cluster, block, 0, TIMEOUT);
- } finally {
- cluster.shutdown();
- }
- }
-
- /**
- * Change the length of a block at datanode dnIndex
- */
- static boolean changeReplicaLength(MiniDFSCluster cluster, ExtendedBlock blk,
- int dnIndex, int lenDelta) throws IOException {
- File blockFile = cluster.getBlockFile(dnIndex, blk);
- if (blockFile != null && blockFile.exists()) {
- RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
- raFile.setLength(raFile.length()+lenDelta);
- raFile.close();
- return true;
- }
- LOG.info("failed to change length of block " + blk);
- return false;
- }
-
- private static void waitForBlockDeleted(MiniDFSCluster cluster,
- ExtendedBlock blk, int dnIndex, long timeout) throws TimeoutException,
- InterruptedException {
- File blockFile = cluster.getBlockFile(dnIndex, blk);
- long failtime = Time.monotonicNow()
- + ((timeout > 0) ? timeout : Long.MAX_VALUE);
- while (blockFile != null && blockFile.exists()) {
- if (failtime < Time.monotonicNow()) {
- throw new TimeoutException("waited too long for blocks to be deleted: "
- + blockFile.getPath() + (blockFile.exists() ? " still exists; " : " is absent; "));
- }
- Thread.sleep(100);
- blockFile = cluster.getBlockFile(dnIndex, blk);
- }
- }
-
- private static final String BASE_PATH = (new File("/data/current/finalized"))
- .getAbsolutePath();
-
- @Test
- public void testReplicaInfoParsing() throws Exception {
- testReplicaInfoParsingSingle(BASE_PATH);
- testReplicaInfoParsingSingle(BASE_PATH + "/subdir1");
- testReplicaInfoParsingSingle(BASE_PATH + "/subdir1/subdir2/subdir3");
- }
-
- private static void testReplicaInfoParsingSingle(String subDirPath) {
- File testFile = new File(subDirPath);
- assertEquals(BASE_PATH, ReplicaInfo.parseBaseDir(testFile).baseDirPath);
- }
-
- @Test
- public void testDuplicateScans() throws Exception {
- long startTime = Time.monotonicNow();
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
- .numDataNodes(1).build();
- FileSystem fs = null;
- try {
- fs = cluster.getFileSystem();
- DataNode dataNode = cluster.getDataNodes().get(0);
- int infoPort = dataNode.getInfoPort();
- long scanTimeBefore = 0, scanTimeAfter = 0;
- for (int i = 1; i < 10; i++) {
- Path fileName = new Path("/test" + i);
- DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
- waitForVerification(infoPort, fs, fileName, i, startTime, TIMEOUT);
- if (i > 1) {
- scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
- DFSTestUtil.getFirstBlock(fs, new Path("/test" + (i - 1))));
- assertFalse("scan time shoud not be 0", scanTimeAfter == 0);
- assertEquals("There should not be duplicate scan", scanTimeBefore,
- scanTimeAfter);
- }
-
- scanTimeBefore = DataNodeTestUtils.getLatestScanTime(dataNode,
- DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
- }
- cluster.restartDataNode(0);
- Thread.sleep(10000);
- dataNode = cluster.getDataNodes().get(0);
- scanTimeAfter = DataNodeTestUtils.getLatestScanTime(dataNode,
- DFSTestUtil.getFirstBlock(fs, new Path("/test" + (9))));
- assertEquals("There should not be duplicate scan", scanTimeBefore,
- scanTimeAfter);
- } finally {
- IOUtils.closeStream(fs);
- cluster.shutdown();
- }
- }
-
-/**
- * This test verifies whether block is added to the first location of
- * BlockPoolSliceScanner#blockInfoSet
- */
- @Test
- public void testAddBlockInfoToFirstLocation() throws Exception {
- MiniDFSCluster cluster = new MiniDFSCluster.Builder(new Configuration())
- .numDataNodes(1).build();
- FileSystem fs = null;
- try {
- fs = cluster.getFileSystem();
- DataNode dataNode = cluster.getDataNodes().get(0);
- // Creating a bunch of blocks
- for (int i = 1; i < 10; i++) {
- Path fileName = new Path("/test" + i);
- DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
- }
- // Get block of the first file created (file1)
- ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, new Path("/test1"));
- dataNode.getBlockScanner().setLastScanTimeDifference(block, 0);
- // Let it sleep for more than 5 seconds so that BlockPoolSliceScanner can
- // scan the first set of blocks
- Thread.sleep(10000);
- Long scanTime1Fortest1Block = DataNodeTestUtils.getLatestScanTime(
- dataNode, block);
- // Create another set of blocks
- for (int i = 10; i < 20; i++) {
- Path fileName = new Path("/test" + i);
- DFSTestUtil.createFile(fs, fileName, 1024, (short) 1, 1000L);
- }
- dataNode.getBlockScanner().addBlock(block, true);
- // Sleep so that BlockPoolSliceScanner can scan the second set of blocks
- // and one block which we scheduled to rescan
- Thread.sleep(10000);
- // Get the lastScanTime of all of the second set of blocks
- Set<Long> lastScanTimeSet = new HashSet<Long>();
- for (int i = 10; i < 20; i++) {
- long lastScanTime = DataNodeTestUtils.getLatestScanTime(dataNode,
- DFSTestUtil.getFirstBlock(fs, new Path("/test" + i)));
- lastScanTimeSet.add(lastScanTime);
- }
- Long scanTime2Fortest1Block = DataNodeTestUtils.getLatestScanTime(
- dataNode, DFSTestUtil.getFirstBlock(fs, new Path("/test1")));
- Long minimumLastScanTime = Collections.min(lastScanTimeSet);
- assertTrue("The second scanTime for test1 block should be greater than "
- + "first scanTime", scanTime2Fortest1Block > scanTime1Fortest1Block);
- assertTrue("The second scanTime for test1 block should be less than or"
- + " equal to minimum of the lastScanTime of second set of blocks",
- scanTime2Fortest1Block <= minimumLastScanTime);
- } finally {
- IOUtils.closeStream(fs);
- cluster.shutdown();
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
index 877f769..281c0ea 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReplication.java
@@ -453,8 +453,7 @@ public class TestReplication {
// Change the length of a replica
for (int i=0; i<cluster.getDataNodes().size(); i++) {
- if (TestDatanodeBlockScanner.changeReplicaLength(cluster, block, i,
- lenDelta)) {
+ if (DFSTestUtil.changeReplicaLength(cluster, block, i, lenDelta)) {
break;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
index f8f476d..2942d0f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.TestDatanodeBlockScanner;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@@ -54,6 +53,7 @@ public class TestOverReplicatedBlocks {
@Test
public void testProcesOverReplicateBlock() throws Exception {
Configuration conf = new HdfsConfiguration();
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
conf.set(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_KEY,
@@ -71,13 +71,14 @@ public class TestOverReplicatedBlocks {
assertTrue(cluster.corruptReplica(0, block));
DataNodeProperties dnProps = cluster.stopDataNode(0);
// remove block scanner log to trigger block scanning
- File scanLog = new File(MiniDFSCluster.getFinalizedDir(
+ File scanCursor = new File(new File(MiniDFSCluster.getFinalizedDir(
cluster.getInstanceStorageDir(0, 0),
- cluster.getNamesystem().getBlockPoolId()).getParent().toString()
- + "/../dncp_block_verification.log.prev");
+ cluster.getNamesystem().getBlockPoolId()).getParent()).getParent(),
+ "scanner.cursor");
//wait for one minute for deletion to succeed;
- for(int i=0; !scanLog.delete(); i++) {
- assertTrue("Could not delete log file in one minute", i < 60);
+ for(int i = 0; !scanCursor.delete(); i++) {
+ assertTrue("Could not delete " + scanCursor.getAbsolutePath() +
+ " in one minute", i < 60);
try {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
index 8eaad11..a22a71e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/BlockReportTestBase.java
@@ -30,6 +30,7 @@ import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
@@ -81,8 +82,8 @@ public abstract class BlockReportTestBase {
private static short REPL_FACTOR = 1;
private static final int RAND_LIMIT = 2000;
- private static final long DN_RESCAN_INTERVAL = 5000;
- private static final long DN_RESCAN_EXTRA_WAIT = 2 * DN_RESCAN_INTERVAL;
+ private static final long DN_RESCAN_INTERVAL = 1;
+ private static final long DN_RESCAN_EXTRA_WAIT = 3 * DN_RESCAN_INTERVAL;
private static final int DN_N0 = 0;
private static final int FILE_START = 0;
@@ -293,7 +294,7 @@ public abstract class BlockReportTestBase {
}
}
- waitTil(DN_RESCAN_EXTRA_WAIT);
+ waitTil(TimeUnit.SECONDS.toMillis(DN_RESCAN_EXTRA_WAIT));
// all blocks belong to the same file, hence same BP
String poolId = cluster.getNamesystem().getBlockPoolId();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index f50afd4..fd51e52 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -113,30 +113,6 @@ public class DataNodeTestUtils {
return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname);
}
-
- public static void runBlockScannerForBlock(DataNode dn, ExtendedBlock b) {
- BlockPoolSliceScanner bpScanner = getBlockPoolScanner(dn, b);
- bpScanner.verifyBlock(new ExtendedBlock(b.getBlockPoolId(),
- new BlockPoolSliceScanner.BlockScanInfo(b.getLocalBlock())));
- }
-
- private static BlockPoolSliceScanner getBlockPoolScanner(DataNode dn,
- ExtendedBlock b) {
- DataBlockScanner scanner = dn.getBlockScanner();
- BlockPoolSliceScanner bpScanner = scanner.getBPScanner(b.getBlockPoolId());
- return bpScanner;
- }
-
- public static long getLatestScanTime(DataNode dn, ExtendedBlock b) {
- BlockPoolSliceScanner scanner = getBlockPoolScanner(dn, b);
- return scanner.getLastScanTime(b.getLocalBlock());
- }
-
- public static void shutdownBlockScanner(DataNode dn) {
- if (dn.blockScanner != null) {
- dn.blockScanner.shutdown();
- }
- }
/**
* This method is used for testing.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 5c543a7..36b62e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -50,7 +50,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -485,6 +484,22 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public void releaseReservedSpace(long bytesToRelease) {
}
+
+ @Override
+ public BlockIterator newBlockIterator(String bpid, String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FsDatasetSpi getDataset() {
+ throw new UnsupportedOperationException();
+ }
}
private final Map<String, Map<Block, BInfo>> blockMap
@@ -1239,11 +1254,6 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public RollingLogs createRollingLogs(String bpid, String prefix) {
- throw new UnsupportedOperationException();
- }
-
- @Override
public FsVolumeSpi getVolume(ExtendedBlock b) {
return volume;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
new file mode 100644
index 0000000..7eaa2bf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockScanner.java
@@ -0,0 +1,680 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER;
+import static org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf.INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Semaphore;
+
+import com.google.common.base.Supplier;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.Statistics;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.Time;
+import org.apache.log4j.Level;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestBlockScanner {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(TestBlockScanner.class);
+
+ @Before
+ public void before() {
+ BlockScanner.Conf.allowUnitTestSettings = true;
+ GenericTestUtils.setLogLevel(BlockScanner.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(VolumeScanner.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(FsVolumeImpl.LOG, Level.ALL);
+ }
+
+ private static void disableBlockScanner(Configuration conf) {
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 0L);
+ }
+
+ private static class TestContext implements Closeable {
+ final int numNameServices;
+ final MiniDFSCluster cluster;
+ final DistributedFileSystem[] dfs;
+ final String[] bpids;
+ final DataNode datanode;
+ final BlockScanner blockScanner;
+ final FsDatasetSpi<? extends FsVolumeSpi> data;
+ final List<? extends FsVolumeSpi> volumes;
+
+ TestContext(Configuration conf, int numNameServices) throws Exception {
+ this.numNameServices = numNameServices;
+ MiniDFSCluster.Builder bld = new MiniDFSCluster.Builder(conf).
+ numDataNodes(1).
+ storagesPerDatanode(1);
+ if (numNameServices > 1) {
+ bld.nnTopology(MiniDFSNNTopology.
+ simpleFederatedTopology(numNameServices));
+ }
+ cluster = bld.build();
+ cluster.waitActive();
+ dfs = new DistributedFileSystem[numNameServices];
+ for (int i = 0; i < numNameServices; i++) {
+ dfs[i] = cluster.getFileSystem(i);
+ }
+ bpids = new String[numNameServices];
+ for (int i = 0; i < numNameServices; i++) {
+ bpids[i] = cluster.getNamesystem(i).getBlockPoolId();
+ }
+ datanode = cluster.getDataNodes().get(0);
+ blockScanner = datanode.getBlockScanner();
+ for (int i = 0; i < numNameServices; i++) {
+ dfs[i].mkdirs(new Path("/test"));
+ }
+ data = datanode.getFSDataset();
+ volumes = data.getVolumes();
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (cluster != null) {
+ for (int i = 0; i < numNameServices; i++) {
+ dfs[i].delete(new Path("/test"), true);
+ }
+ cluster.shutdown();
+ }
+ }
+
+ public void createFiles(int nsIdx, int numFiles, int length)
+ throws Exception {
+ for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
+ DFSTestUtil.createFile(dfs[nsIdx], getPath(blockIdx), length,
+ (short)1, 123L);
+ }
+ }
+
+ public Path getPath(int fileIdx) {
+ return new Path("/test/" + fileIdx);
+ }
+
+ public ExtendedBlock getFileBlock(int nsIdx, int fileIdx)
+ throws Exception {
+ return DFSTestUtil.getFirstBlock(dfs[nsIdx], getPath(fileIdx));
+ }
+ }
+
+ /**
+ * Test iterating through a bunch of blocks in a volume using a volume
+ * iterator.<p/>
+ *
+ * We will rewind the iterator when about halfway through the blocks.
+ *
+ * @param numFiles The number of files to create.
+ * @param maxStaleness The maximum staleness to allow with the iterator.
+ * @throws Exception
+ */
+ private void testVolumeIteratorImpl(int numFiles,
+ long maxStaleness) throws Exception {
+ Configuration conf = new Configuration();
+ disableBlockScanner(conf);
+ TestContext ctx = new TestContext(conf, 1);
+ ctx.createFiles(0, numFiles, 1);
+ assertEquals(1, ctx.volumes.size());
+ FsVolumeSpi volume = ctx.volumes.get(0);
+ ExtendedBlock savedBlock = null, loadedBlock = null;
+ boolean testedRewind = false, testedSave = false, testedLoad = false;
+ int blocksProcessed = 0, savedBlocksProcessed = 0;
+ try {
+ BPOfferService bpos[] = ctx.datanode.getAllBpOs();
+ assertEquals(1, bpos.length);
+ BlockIterator iter = volume.newBlockIterator(ctx.bpids[0], "test");
+ assertEquals(ctx.bpids[0], iter.getBlockPoolId());
+ iter.setMaxStalenessMs(maxStaleness);
+ while (true) {
+ HashSet<ExtendedBlock> blocks = new HashSet<ExtendedBlock>();
+ for (int blockIdx = 0; blockIdx < numFiles; blockIdx++) {
+ blocks.add(ctx.getFileBlock(0, blockIdx));
+ }
+ while (true) {
+ ExtendedBlock block = iter.nextBlock();
+ if (block == null) {
+ break;
+ }
+ blocksProcessed++;
+ LOG.info("BlockIterator for {} found block {}, blocksProcessed = {}",
+ volume, block, blocksProcessed);
+ if (testedSave && (savedBlock == null)) {
+ savedBlock = block;
+ }
+ if (testedLoad && (loadedBlock == null)) {
+ loadedBlock = block;
+ // The block that we get back right after loading the iterator
+ // should be the same block we got back right after saving
+ // the iterator.
+ assertEquals(savedBlock, loadedBlock);
+ }
+ boolean blockRemoved = blocks.remove(block);
+ assertTrue("Found unknown block " + block, blockRemoved);
+ if (blocksProcessed > (numFiles / 3)) {
+ if (!testedSave) {
+ LOG.info("Processed {} blocks out of {}. Saving iterator.",
+ blocksProcessed, numFiles);
+ iter.save();
+ testedSave = true;
+ savedBlocksProcessed = blocksProcessed;
+ }
+ }
+ if (blocksProcessed > (numFiles / 2)) {
+ if (!testedRewind) {
+ LOG.info("Processed {} blocks out of {}. Rewinding iterator.",
+ blocksProcessed, numFiles);
+ iter.rewind();
+ break;
+ }
+ }
+ if (blocksProcessed > ((2 * numFiles) / 3)) {
+ if (!testedLoad) {
+ LOG.info("Processed {} blocks out of {}. Loading iterator.",
+ blocksProcessed, numFiles);
+ iter = volume.loadBlockIterator(ctx.bpids[0], "test");
+ iter.setMaxStalenessMs(maxStaleness);
+ break;
+ }
+ }
+ }
+ if (!testedRewind) {
+ testedRewind = true;
+ blocksProcessed = 0;
+ LOG.info("Starting again at the beginning...");
+ continue;
+ }
+ if (!testedLoad) {
+ testedLoad = true;
+ blocksProcessed = savedBlocksProcessed;
+ LOG.info("Starting again at the load point...");
+ continue;
+ }
+ assertEquals(numFiles, blocksProcessed);
+ break;
+ }
+ } finally {
+ ctx.close();
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testVolumeIteratorWithoutCaching() throws Exception {
+ testVolumeIteratorImpl(5, 0);
+ }
+
+ @Test(timeout=60000)
+ public void testVolumeIteratorWithCaching() throws Exception {
+ testVolumeIteratorImpl(600, 100);
+ }
+
+ @Test(timeout=60000)
+ public void testDisableVolumeScanner() throws Exception {
+ Configuration conf = new Configuration();
+ disableBlockScanner(conf);
+ TestContext ctx = new TestContext(conf, 1);
+ try {
+ Assert.assertFalse(ctx.datanode.getBlockScanner().isEnabled());
+ } finally {
+ ctx.close();
+ }
+ }
+
+ public static class TestScanResultHandler extends ScanResultHandler {
+ static class Info {
+ boolean shouldRun = false;
+ final Set<ExtendedBlock> badBlocks = new HashSet<ExtendedBlock>();
+ final Set<ExtendedBlock> goodBlocks = new HashSet<ExtendedBlock>();
+ long blocksScanned = 0;
+ Semaphore sem = null;
+ }
+
+ private VolumeScanner scanner;
+
+ final static ConcurrentHashMap<String, Info> infos =
+ new ConcurrentHashMap<String, Info>();
+
+ static Info getInfo(FsVolumeSpi volume) {
+ Info newInfo = new Info();
+ Info prevInfo = infos.
+ putIfAbsent(volume.getStorageID(), newInfo);
+ return prevInfo == null ? newInfo : prevInfo;
+ }
+
+ @Override
+ public void setup(VolumeScanner scanner) {
+ this.scanner = scanner;
+ Info info = getInfo(scanner.volume);
+ LOG.info("about to start scanning.");
+ synchronized (info) {
+ while (!info.shouldRun) {
+ try {
+ info.wait();
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ LOG.info("starting scanning.");
+ }
+
+ @Override
+ public void handle(ExtendedBlock block, IOException e) {
+ LOG.info("handling block {} (exception {})", block, e);
+ Info info = getInfo(scanner.volume);
+ Semaphore sem;
+ synchronized (info) {
+ sem = info.sem;
+ }
+ if (sem != null) {
+ try {
+ sem.acquire();
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("interrupted");
+ }
+ }
+ synchronized (info) {
+ if (!info.shouldRun) {
+ throw new RuntimeException("stopping volumescanner thread.");
+ }
+ if (e == null) {
+ info.goodBlocks.add(block);
+ } else {
+ info.badBlocks.add(block);
+ }
+ info.blocksScanned++;
+ }
+ }
+ }
+
+ private void testScanAllBlocksImpl(final boolean rescan) throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 1048576L);
+ if (rescan) {
+ conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 100L);
+ } else {
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+ }
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ TestScanResultHandler.class.getName());
+ final TestContext ctx = new TestContext(conf, 1);
+ final int NUM_EXPECTED_BLOCKS = 10;
+ ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+ final Set<ExtendedBlock> expectedBlocks = new HashSet<ExtendedBlock>();
+ for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
+ expectedBlocks.add(ctx.getFileBlock(0, i));
+ }
+ TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ synchronized (info) {
+ info.shouldRun = true;
+ info.notify();
+ }
+ GenericTestUtils.waitFor(new Supplier<Boolean>(){
+ @Override
+ public Boolean get() {
+ TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ int numFoundBlocks = 0;
+ StringBuilder foundBlocksBld = new StringBuilder();
+ String prefix = "";
+ synchronized (info) {
+ for (ExtendedBlock block : info.goodBlocks) {
+ assertTrue(expectedBlocks.contains(block));
+ numFoundBlocks++;
+ foundBlocksBld.append(prefix).append(block);
+ prefix = ", ";
+ }
+ LOG.info("numFoundBlocks = {}. blocksScanned = {}. Found blocks {}",
+ numFoundBlocks, info.blocksScanned, foundBlocksBld.toString());
+ if (rescan) {
+ return (numFoundBlocks == NUM_EXPECTED_BLOCKS) &&
+ (info.blocksScanned >= 2 * NUM_EXPECTED_BLOCKS);
+ } else {
+ return numFoundBlocks == NUM_EXPECTED_BLOCKS;
+ }
+ }
+ }
+ }, 10, 60000);
+ if (!rescan) {
+ synchronized (info) {
+ assertEquals(NUM_EXPECTED_BLOCKS, info.blocksScanned);
+ }
+ Statistics stats = ctx.blockScanner.getVolumeStats(
+ ctx.volumes.get(0).getStorageID());
+ assertEquals(5 * NUM_EXPECTED_BLOCKS, stats.bytesScannedInPastHour);
+ assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedSinceRestart);
+ assertEquals(NUM_EXPECTED_BLOCKS, stats.blocksScannedInCurrentPeriod);
+ assertEquals(0, stats.scanErrorsSinceRestart);
+ assertEquals(1, stats.scansSinceRestart);
+ }
+ ctx.close();
+ }
+
+ /**
+ * Test scanning all blocks. Set the scan period high enough that
+ * we shouldn't rescan any block during this test.
+ */
+ @Test(timeout=60000)
+ public void testScanAllBlocksNoRescan() throws Exception {
+ testScanAllBlocksImpl(false);
+ }
+
+ /**
+ * Test scanning all blocks. Set the scan period high enough that
+ * we should rescan all blocks at least twice during this test.
+ */
+ @Test(timeout=60000)
+ public void testScanAllBlocksWithRescan() throws Exception {
+ testScanAllBlocksImpl(true);
+ }
+
+ /**
+ * Test that we don't scan too many blocks per second.
+ */
+ @Test(timeout=120000)
+ public void testScanRateLimit() throws Exception {
+ Configuration conf = new Configuration();
+ // Limit scan bytes per second dramatically
+ conf.setLong(DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND, 4096L);
+ // Scan continuously
+ conf.setLong(INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS, 1L);
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ TestScanResultHandler.class.getName());
+ final TestContext ctx = new TestContext(conf, 1);
+ final int NUM_EXPECTED_BLOCKS = 5;
+ ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4096);
+ final TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ long startMs = Time.monotonicNow();
+ synchronized (info) {
+ info.shouldRun = true;
+ info.notify();
+ }
+ Thread.sleep(5000);
+ synchronized (info) {
+ long endMs = Time.monotonicNow();
+ // Should scan no more than one block a second.
+ long maxBlocksScanned = ((endMs + 999 - startMs) / 1000);
+ assertTrue(info.blocksScanned < maxBlocksScanned);
+ }
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ return info.blocksScanned > 0;
+ }
+ }
+ }, 1, 30000);
+ ctx.close();
+ }
+
+ @Test(timeout=120000)
+ public void testCorruptBlockHandling() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ TestScanResultHandler.class.getName());
+ final TestContext ctx = new TestContext(conf, 1);
+ final int NUM_EXPECTED_BLOCKS = 5;
+ final int CORRUPT_INDEX = 3;
+ ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 4);
+ ExtendedBlock badBlock = ctx.getFileBlock(0, CORRUPT_INDEX);
+ ctx.cluster.corruptBlockOnDataNodes(badBlock);
+ final TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ synchronized (info) {
+ info.shouldRun = true;
+ info.notify();
+ }
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ return info.blocksScanned == NUM_EXPECTED_BLOCKS;
+ }
+ }
+ }, 3, 30000);
+ synchronized (info) {
+ assertTrue(info.badBlocks.contains(badBlock));
+ for (int i = 0; i < NUM_EXPECTED_BLOCKS; i++) {
+ if (i != CORRUPT_INDEX) {
+ ExtendedBlock block = ctx.getFileBlock(0, i);
+ assertTrue(info.goodBlocks.contains(block));
+ }
+ }
+ }
+ ctx.close();
+ }
+
+ /**
+ * Test that we save the scan cursor when shutting down the datanode, and
+ * restart scanning from there when the datanode is restarted.
+ */
+ @Test(timeout=120000)
+ public void testDatanodeCursor() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ TestScanResultHandler.class.getName());
+ conf.setLong(INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS, 0L);
+ final TestContext ctx = new TestContext(conf, 1);
+ final int NUM_EXPECTED_BLOCKS = 10;
+ ctx.createFiles(0, NUM_EXPECTED_BLOCKS, 1);
+ final TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ synchronized (info) {
+ info.sem = new Semaphore(5);
+ info.shouldRun = true;
+ info.notify();
+ }
+ // Scan the first 5 blocks
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ return info.blocksScanned == 5;
+ }
+ }
+ }, 3, 30000);
+ synchronized (info) {
+ assertEquals(5, info.goodBlocks.size());
+ assertEquals(5, info.blocksScanned);
+ info.shouldRun = false;
+ }
+ ctx.datanode.shutdown();
+ String vPath = ctx.volumes.get(0).getBasePath();
+ File cursorPath = new File(new File(new File(vPath, "current"),
+ ctx.bpids[0]), "scanner.cursor");
+ assertTrue("Failed to find cursor save file in " +
+ cursorPath.getAbsolutePath(), cursorPath.exists());
+ Set<ExtendedBlock> prevGoodBlocks = new HashSet<ExtendedBlock>();
+ synchronized (info) {
+ info.sem = new Semaphore(4);
+ prevGoodBlocks.addAll(info.goodBlocks);
+ info.goodBlocks.clear();
+ }
+
+ // The block that we were scanning when we shut down the DN won't get
+ // recorded.
+ // After restarting the datanode, we should scan the next 4 blocks.
+ ctx.cluster.restartDataNode(0);
+ synchronized (info) {
+ info.shouldRun = true;
+ info.notify();
+ }
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ if (info.blocksScanned != 9) {
+ LOG.info("Waiting for blocksScanned to reach 9. It is at {}",
+ info.blocksScanned);
+ }
+ return info.blocksScanned == 9;
+ }
+ }
+ }, 3, 30000);
+ synchronized (info) {
+ assertEquals(4, info.goodBlocks.size());
+ info.goodBlocks.addAll(prevGoodBlocks);
+ assertEquals(9, info.goodBlocks.size());
+ assertEquals(9, info.blocksScanned);
+ }
+ ctx.datanode.shutdown();
+
+ // After restarting the datanode, we should not scan any more blocks.
+ // This is because we reached the end of the block pool earlier, and
+ // the scan period is much, much longer than the test time.
+ synchronized (info) {
+ info.sem = null;
+ info.shouldRun = false;
+ info.goodBlocks.clear();
+ }
+ ctx.cluster.restartDataNode(0);
+ synchronized (info) {
+ info.shouldRun = true;
+ info.notify();
+ }
+ Thread.sleep(3000);
+ synchronized (info) {
+ assertTrue(info.goodBlocks.isEmpty());
+ }
+ ctx.close();
+ }
+
+ @Test(timeout=120000)
+ public void testMultipleBlockPoolScanning() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 100L);
+ conf.set(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ TestScanResultHandler.class.getName());
+ final TestContext ctx = new TestContext(conf, 3);
+
+ // We scan 5 bytes per file (1 byte in file, 4 bytes of checksum)
+ final int BYTES_SCANNED_PER_FILE = 5;
+ final int NUM_FILES[] = new int[] { 1, 5, 10 };
+ int TOTAL_FILES = 0;
+ for (int i = 0; i < NUM_FILES.length; i++) {
+ TOTAL_FILES += NUM_FILES[i];
+ }
+ ctx.createFiles(0, NUM_FILES[0], 1);
+ ctx.createFiles(0, NUM_FILES[1], 1);
+ ctx.createFiles(0, NUM_FILES[2], 1);
+
+ // start scanning
+ final TestScanResultHandler.Info info =
+ TestScanResultHandler.getInfo(ctx.volumes.get(0));
+ synchronized (info) {
+ info.shouldRun = true;
+ info.notify();
+ }
+
+ // Wait for all the block pools to be scanned.
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ synchronized (info) {
+ Statistics stats = ctx.blockScanner.getVolumeStats(
+ ctx.volumes.get(0).getStorageID());
+ if (stats.scansSinceRestart < 3) {
+ LOG.info("Waiting for scansSinceRestart to reach 3 (it is {})",
+ stats.scansSinceRestart);
+ return false;
+ }
+ if (!stats.eof) {
+ LOG.info("Waiting for eof.");
+ return false;
+ }
+ return true;
+ }
+ }
+ }, 3, 30000);
+
+ Statistics stats = ctx.blockScanner.getVolumeStats(
+ ctx.volumes.get(0).getStorageID());
+ assertEquals(TOTAL_FILES, stats.blocksScannedSinceRestart);
+ assertEquals(BYTES_SCANNED_PER_FILE * TOTAL_FILES,
+ stats.bytesScannedInPastHour);
+ ctx.close();
+ }
+
+ @Test(timeout=120000)
+ public void testNextSorted() throws Exception {
+ List<String> arr = new LinkedList<String>();
+ arr.add("1");
+ arr.add("3");
+ arr.add("5");
+ arr.add("7");
+ Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "2"));
+ Assert.assertEquals("3", FsVolumeImpl.nextSorted(arr, "1"));
+ Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, ""));
+ Assert.assertEquals("1", FsVolumeImpl.nextSorted(arr, null));
+ Assert.assertEquals(null, FsVolumeImpl.nextSorted(arr, "9"));
+ }
+
+ @Test(timeout=120000)
+ public void testCalculateNeededBytesPerSec() throws Exception {
+ // If we didn't check anything the last hour, we should scan now.
+ Assert.assertTrue(
+ VolumeScanner.calculateShouldScan(100, 0));
+
+ // If, on average, we checked 101 bytes/s checked during the last hour,
+ // stop checking now.
+ Assert.assertFalse(
+ VolumeScanner.calculateShouldScan(100, 101 * 3600));
+
+ // Target is 1 byte / s, but we didn't scan anything in the last minute.
+ // Should scan now.
+ Assert.assertTrue(
+ VolumeScanner.calculateShouldScan(1, 3540));
+
+ // Target is 1000000 byte / s, but we didn't scan anything in the last
+ // minute. Should scan now.
+ Assert.assertTrue(
+ VolumeScanner.calculateShouldScan(100000L, 354000000L));
+
+ Assert.assertFalse(
+ VolumeScanner.calculateShouldScan(100000L, 365000000L));
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
index 33675c7..d16a831 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
@@ -589,6 +589,22 @@ public class TestDirectoryScanner {
public boolean isTransientStorage() {
return false;
}
+
+ @Override
+ public BlockIterator newBlockIterator(String bpid, String name) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FsDatasetSpi getDataset() {
+ throw new UnsupportedOperationException();
+ }
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
deleted file mode 100644
index 55b1739..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMultipleNNDataBlockScanner.java
+++ /dev/null
@@ -1,245 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.commons.logging.impl.Log4JLogger;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSNNTopology;
-import static org.apache.hadoop.hdfs.server.datanode.DataBlockScanner.SLEEP_PERIOD_MS;
-import org.apache.log4j.Level;
-import org.junit.Assert;
-import org.junit.Test;
-import static org.junit.Assert.fail;
-
-
-public class TestMultipleNNDataBlockScanner {
- private static final Log LOG =
- LogFactory.getLog(TestMultipleNNDataBlockScanner.class);
- Configuration conf;
- MiniDFSCluster cluster = null;
- final String[] bpids = new String[3];
- final FileSystem[] fs = new FileSystem[3];
-
- public void setUp() throws IOException {
- conf = new HdfsConfiguration();
- conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 100);
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 100);
- cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
- .build();
- for (int i = 0; i < 3; i++) {
- cluster.waitActive(i);
- }
- for (int i = 0; i < 3; i++) {
- bpids[i] = cluster.getNamesystem(i).getBlockPoolId();
- }
- for (int i = 0; i < 3; i++) {
- fs[i] = cluster.getFileSystem(i);
- }
- // Create 2 files on each namenode with 10 blocks each
- for (int i = 0; i < 3; i++) {
- DFSTestUtil.createFile(fs[i], new Path("file1"), 1000, (short) 1, 0);
- DFSTestUtil.createFile(fs[i], new Path("file2"), 1000, (short) 1, 1);
- }
- }
-
- @Test(timeout=120000)
- public void testDataBlockScanner() throws IOException, InterruptedException {
- setUp();
- try {
- DataNode dn = cluster.getDataNodes().get(0);
- for (int i = 0; i < 3; i++) {
- long blocksScanned = 0;
- while (blocksScanned != 20) {
- blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
- LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i]
- + "; Scanned so far=" + blocksScanned);
- Thread.sleep(5000);
- }
- }
-
- StringBuilder buffer = new StringBuilder();
- dn.blockScanner.printBlockReport(buffer, false);
- LOG.info("Block Report\n" + buffer.toString());
- } finally {
- cluster.shutdown();
- }
- }
-
- @Test(timeout=120000)
- public void testBlockScannerAfterRefresh() throws IOException,
- InterruptedException {
- setUp();
- try {
- Configuration dnConf = cluster.getDataNodes().get(0).getConf();
- Configuration conf = new HdfsConfiguration(dnConf);
- StringBuilder namenodesBuilder = new StringBuilder();
-
- String bpidToShutdown = cluster.getNamesystem(2).getBlockPoolId();
- for (int i = 0; i < 2; i++) {
- String nsId = DFSUtil.getNamenodeNameServiceId(cluster
- .getConfiguration(i));
- namenodesBuilder.append(nsId);
- namenodesBuilder.append(",");
- }
-
- conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder
- .toString());
- DataNode dn = cluster.getDataNodes().get(0);
- dn.refreshNamenodes(conf);
-
- try {
- while (true) {
- dn.blockScanner.getBlocksScannedInLastRun(bpidToShutdown);
- Thread.sleep(1000);
- }
- } catch (IOException ex) {
- // Expected
- LOG.info(ex.getMessage());
- }
-
- namenodesBuilder.append(DFSUtil.getNamenodeNameServiceId(cluster
- .getConfiguration(2)));
- conf.set(DFSConfigKeys.DFS_NAMESERVICES, namenodesBuilder
- .toString());
- dn.refreshNamenodes(conf);
-
- for (int i = 0; i < 3; i++) {
- long blocksScanned = 0;
- while (blocksScanned != 20) {
- blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
- LOG.info("Waiting for all blocks to be scanned for bpid=" + bpids[i]
- + "; Scanned so far=" + blocksScanned);
- Thread.sleep(5000);
- }
- }
- } finally {
- cluster.shutdown();
- }
- }
-
- @Test(timeout=120000)
- public void testBlockScannerAfterRestart() throws IOException,
- InterruptedException {
- setUp();
- try {
- cluster.restartDataNode(0);
- cluster.waitActive();
- DataNode dn = cluster.getDataNodes().get(0);
- for (int i = 0; i < 3; i++) {
- while (!dn.blockScanner.isInitialized(bpids[i])) {
- Thread.sleep(1000);
- }
- long blocksScanned = 0;
- while (blocksScanned != 20) {
- if (dn.blockScanner != null) {
- blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpids[i]);
- LOG.info("Waiting for all blocks to be scanned for bpid="
- + bpids[i] + "; Scanned so far=" + blocksScanned);
- }
- Thread.sleep(5000);
- }
- }
- } finally {
- cluster.shutdown();
- }
- }
-
- @Test(timeout=120000)
- public void test2NNBlockRescanInterval() throws IOException {
- ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
- Configuration conf = new HdfsConfiguration();
- cluster = new MiniDFSCluster.Builder(conf)
- .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(3))
- .build();
-
- try {
- FileSystem fs = cluster.getFileSystem(1);
- Path file2 = new Path("/test/testBlockScanInterval");
- DFSTestUtil.createFile(fs, file2, 30, (short) 1, 0);
-
- fs = cluster.getFileSystem(0);
- Path file1 = new Path("/test/testBlockScanInterval");
- DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0);
- for (int i = 0; i < 8; i++) {
- LOG.info("Verifying that the blockscanner scans exactly once");
- waitAndScanBlocks(1, 1);
- }
- } finally {
- cluster.shutdown();
- }
- }
-
- /**
- * HDFS-3828: DN rescans blocks too frequently
- *
- * @throws Exception
- */
- @Test(timeout=120000)
- public void testBlockRescanInterval() throws IOException {
- ((Log4JLogger)BlockPoolSliceScanner.LOG).getLogger().setLevel(Level.ALL);
- Configuration conf = new HdfsConfiguration();
- cluster = new MiniDFSCluster.Builder(conf).build();
-
- try {
- FileSystem fs = cluster.getFileSystem();
- Path file1 = new Path("/test/testBlockScanInterval");
- DFSTestUtil.createFile(fs, file1, 30, (short) 1, 0);
- for (int i = 0; i < 4; i++) {
- LOG.info("Verifying that the blockscanner scans exactly once");
- waitAndScanBlocks(1, 1);
- }
- } finally {
- cluster.shutdown();
- }
- }
-
- void waitAndScanBlocks(long scansLastRun, long scansTotal)
- throws IOException {
- // DataBlockScanner will run for every 5 seconds so we are checking for
- // every 5 seconds
- int n = 5;
- String bpid = cluster.getNamesystem(0).getBlockPoolId();
- DataNode dn = cluster.getDataNodes().get(0);
- long blocksScanned, total;
- do {
- try {
- Thread.sleep(SLEEP_PERIOD_MS);
- } catch (InterruptedException e) {
- fail("Interrupted: " + e);
- }
- blocksScanned = dn.blockScanner.getBlocksScannedInLastRun(bpid);
- total = dn.blockScanner.getTotalScans(bpid);
- LOG.info("bpid = " + bpid + " blocksScanned = " + blocksScanned + " total=" + total);
- } while (n-- > 0 && (blocksScanned != scansLastRun || scansTotal != total));
- Assert.assertEquals(scansTotal, total);
- Assert.assertEquals(scansLastRun, blocksScanned);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
index bc91625..cff8ca8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalDatasetImpl.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
@@ -51,12 +50,6 @@ public class ExternalDatasetImpl implements FsDatasetSpi<ExternalVolumeImpl> {
StorageType.DEFAULT);
@Override
- public RollingLogs createRollingLogs(String bpid, String prefix)
- throws IOException {
- return new ExternalRollingLogs();
- }
-
- @Override
public List<ExternalVolumeImpl> getVolumes() {
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
deleted file mode 100644
index c9fb7c8..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalRollingLogs.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode.extdataset;
-
-import java.io.IOException;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-
-public class ExternalRollingLogs implements RollingLogs {
-
- private class ExternalLineIterator implements LineIterator {
- @Override
- public boolean isPrevious() {
- return false;
- }
-
- @Override
- public boolean isLastReadFromPrevious() {
- return false;
- }
-
- @Override
- public boolean hasNext() {
- return false;
- }
-
- @Override
- public String next() {
- return null;
- }
-
- @Override
- public void remove() {
- }
-
- @Override
- public void close() throws IOException {
- }
- }
-
- private class ExternalAppender implements Appender {
- @Override
- public Appendable append(CharSequence cs) throws IOException {
- return null;
- }
-
- @Override
- public Appendable append(CharSequence cs, int i, int i1)
- throws IOException {
- return null;
- }
-
- @Override
- public Appendable append(char c) throws IOException {
- return null;
- }
-
- @Override
- public void close() throws IOException {
- }
- }
-
- @Override
- public LineIterator iterator(boolean skipPrevious) throws IOException {
- return new ExternalLineIterator();
- }
-
- @Override
- public Appender appender() {
- return new ExternalAppender();
- }
-
- @Override
- public boolean roll() throws IOException {
- return false;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
index 9938980..c8383e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/ExternalVolumeImpl.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@@ -80,4 +81,20 @@ public class ExternalVolumeImpl implements FsVolumeSpi {
@Override
public void releaseReservedSpace(long bytesToRelease) {
}
+
+ @Override
+ public BlockIterator newBlockIterator(String bpid, String name) {
+ return null;
+ }
+
+ @Override
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException {
+ return null;
+ }
+
+ @Override
+ public FsDatasetSpi getDataset() {
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
index 791bb76..82a6951 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/extdataset/TestExternalDataset.java
@@ -22,7 +22,6 @@ import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.junit.Test;
/**
@@ -80,14 +79,6 @@ public class TestExternalDataset {
}
/**
- * Tests instantiating a RollingLogs subclass.
- */
- @Test
- public void testInstantiateRollingLogs() throws Throwable {
- RollingLogs inst = new ExternalRollingLogs();
- }
-
- /**
* Tests instantiating an FsVolumeSpi subclass.
*/
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 5c18495..8f87f57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -22,17 +22,17 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
-import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DNConf;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.ReplicaHandler;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
@@ -51,19 +51,17 @@ import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyListOf;
-import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -81,7 +79,6 @@ public class TestFsDatasetImpl {
private Configuration conf;
private DataNode datanode;
private DataStorage storage;
- private DataBlockScanner scanner;
private FsDatasetImpl dataset;
private static Storage.StorageDirectory createStorageDirectory(File root) {
@@ -112,13 +109,14 @@ public class TestFsDatasetImpl {
public void setUp() throws IOException {
datanode = mock(DataNode.class);
storage = mock(DataStorage.class);
- scanner = mock(DataBlockScanner.class);
this.conf = new Configuration();
+ this.conf.setLong(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, 0);
final DNConf dnConf = new DNConf(conf);
when(datanode.getConf()).thenReturn(conf);
when(datanode.getDnConf()).thenReturn(dnConf);
- when(datanode.getBlockScanner()).thenReturn(scanner);
+ final BlockScanner disabledBlockScanner = new BlockScanner(datanode, conf);
+ when(datanode.getBlockScanner()).thenReturn(disabledBlockScanner);
createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
dataset = new FsDatasetImpl(datanode, storage, conf);
@@ -208,10 +206,6 @@ public class TestFsDatasetImpl {
assertEquals("The replica infos on this volume has been removed from the "
+ "volumeMap.", NUM_BLOCKS / NUM_INIT_VOLUMES,
totalNumReplicas);
-
- // Verify that every BlockPool deletes the removed blocks from the volume.
- verify(scanner, times(BLOCK_POOL_IDS.length))
- .deleteBlocks(anyString(), any(Block[].class));
}
@Test(timeout = 5000)
@@ -245,7 +239,9 @@ public class TestFsDatasetImpl {
public void testChangeVolumeWithRunningCheckDirs() throws IOException {
RoundRobinVolumeChoosingPolicy<FsVolumeImpl> blockChooser =
new RoundRobinVolumeChoosingPolicy<>();
- final FsVolumeList volumeList = new FsVolumeList(0, blockChooser);
+ final BlockScanner blockScanner = new BlockScanner(datanode, conf);
+ final FsVolumeList volumeList =
+ new FsVolumeList(0, blockScanner, blockChooser);
final List<FsVolumeImpl> oldVolumes = new ArrayList<>();
// Initialize FsVolumeList with 5 mock volumes.
@@ -254,19 +250,23 @@ public class TestFsDatasetImpl {
FsVolumeImpl volume = mock(FsVolumeImpl.class);
oldVolumes.add(volume);
when(volume.getBasePath()).thenReturn("data" + i);
- volumeList.addVolume(volume);
+ FsVolumeReference ref = mock(FsVolumeReference.class);
+ when(ref.getVolume()).thenReturn(volume);
+ volumeList.addVolume(ref);
}
// When call checkDirs() on the 2nd volume, anther "thread" removes the 5th
// volume and add another volume. It does not affect checkDirs() running.
final FsVolumeImpl newVolume = mock(FsVolumeImpl.class);
+ final FsVolumeReference newRef = mock(FsVolumeReference.class);
+ when(newRef.getVolume()).thenReturn(newVolume);
FsVolumeImpl blockedVolume = volumeList.getVolumes().get(1);
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocationOnMock)
throws Throwable {
volumeList.removeVolume(new File("data4"));
- volumeList.addVolume(newVolume);
+ volumeList.addVolume(newRef);
return null;
}
}).when(blockedVolume).checkDirs();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
index 3609684..6cc3d7e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestInterDatanodeProtocol.java
@@ -184,8 +184,8 @@ public class TestInterDatanodeProtocol {
InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
datanode, datanodeinfo[0], conf, useDnHostname);
- //stop block scanner, so we could compare lastScanTime
- DataNodeTestUtils.shutdownBlockScanner(datanode);
+ // Stop the block scanners.
+ datanode.getBlockScanner().removeAllVolumeScanners();
//verify BlockMetaDataInfo
ExtendedBlock b = locatedblock.getBlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
index 918a2d5..11b19f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotTestHelper.java
@@ -47,7 +47,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.datanode.BlockPoolSliceStorage;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DirectoryScanner;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
@@ -87,7 +87,7 @@ public class SnapshotTestHelper {
GenericTestUtils.disableLog(LogFactory.getLog(DirectoryScanner.class));
GenericTestUtils.disableLog(LogFactory.getLog(MetricsSystemImpl.class));
- GenericTestUtils.disableLog(DataBlockScanner.LOG);
+ GenericTestUtils.disableLog(BlockScanner.LOG);
GenericTestUtils.disableLog(HttpServer2.LOG);
GenericTestUtils.disableLog(DataNode.LOG);
GenericTestUtils.disableLog(BlockPoolSliceStorage.LOG);
[3/3] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use
O(1) memory and use multiple threads (cmccabe)
Posted by cm...@apache.org.
HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)
(cherry picked from commit 6e62a1a6728b1f782f64065424f92b292c3f163a)
Conflicts:
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeListTest.java
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a8c6a96f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a8c6a96f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a8c6a96f
Branch: refs/heads/branch-2
Commit: a8c6a96f3b47e15822b2a37b3c19dfc4e18254cb
Parents: ebf6499
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Dec 17 11:27:48 2014 -0800
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Fri Feb 13 15:04:06 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../hdfs/server/datanode/BPOfferService.java | 3 -
.../hdfs/server/datanode/BPServiceActor.java | 6 -
.../server/datanode/BlockPoolSliceScanner.java | 872 -------------------
.../hdfs/server/datanode/BlockReceiver.java | 8 -
.../hdfs/server/datanode/BlockScanner.java | 308 +++++++
.../hdfs/server/datanode/BlockSender.java | 3 -
.../hdfs/server/datanode/DataBlockScanner.java | 339 -------
.../hadoop/hdfs/server/datanode/DataNode.java | 72 +-
.../hdfs/server/datanode/VolumeScanner.java | 652 ++++++++++++++
.../server/datanode/fsdataset/FsDatasetSpi.java | 28 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 110 +++
.../server/datanode/fsdataset/RollingLogs.java | 73 --
.../datanode/fsdataset/impl/FsDatasetImpl.java | 44 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 347 ++++++++
.../datanode/fsdataset/impl/FsVolumeList.java | 24 +-
.../fsdataset/impl/RollingLogsImpl.java | 241 -----
.../src/main/resources/hdfs-default.xml | 20 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 16 +
.../hadoop/hdfs/TestDatanodeBlockScanner.java | 551 ------------
.../org/apache/hadoop/hdfs/TestReplication.java | 3 +-
.../TestOverReplicatedBlocks.java | 13 +-
.../server/datanode/BlockReportTestBase.java | 7 +-
.../hdfs/server/datanode/DataNodeTestUtils.java | 24 -
.../server/datanode/SimulatedFSDataset.java | 22 +-
.../hdfs/server/datanode/TestBlockScanner.java | 680 +++++++++++++++
.../server/datanode/TestDirectoryScanner.java | 16 +
.../TestMultipleNNDataBlockScanner.java | 245 ------
.../extdataset/ExternalDatasetImpl.java | 7 -
.../extdataset/ExternalRollingLogs.java | 92 --
.../datanode/extdataset/ExternalVolumeImpl.java | 17 +
.../extdataset/TestExternalDataset.java | 9 -
.../fsdataset/impl/TestFsDatasetImpl.java | 30 +-
.../impl/TestInterDatanodeProtocol.java | 4 +-
.../namenode/snapshot/SnapshotTestHelper.java | 4 +-
36 files changed, 2273 insertions(+), 2622 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 39f37e1..284f1e6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -335,6 +335,9 @@ Release 2.7.0 - UNRELEASED
HDFS-316. Balancer should run for a configurable # of iterations (Xiaoyu
Yao via aw)
+ HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple
+ threads (cmccabe)
+
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9c87b73..6fdf304 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -445,6 +445,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
+ public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
+ public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index ca00941..47b8e4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -661,9 +661,6 @@ class BPOfferService {
//
Block toDelete[] = bcmd.getBlocks();
try {
- if (dn.blockScanner != null) {
- dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
- }
// using global fsdataset
dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 6529a6b..3703b5c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -719,12 +719,6 @@ class BPServiceActor implements Runnable {
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
- // Now safe to start scanning the block pool.
- // If it has already been started, this is a no-op.
- if (dn.blockScanner != null) {
- dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
- }
-
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
deleted file mode 100644
index f36fea1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ /dev/null
@@ -1,872 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.util.GSet;
-import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Scans the block files under a block pool and verifies that the
- * files are not corrupt.
- * This keeps track of blocks and their last verification times.
- * Currently it does not modify the metadata for block.
- */
-
-class BlockPoolSliceScanner {
-
- public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class);
-
- private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
-
- private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
- private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
- private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
-
- private static final String VERIFICATION_PREFIX = "dncp_block_verification.log";
-
- private final String blockPoolId;
- private final long scanPeriod;
- private final AtomicLong lastScanTime = new AtomicLong();
-
- private final DataNode datanode;
- private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
-
- private final SortedSet<BlockScanInfo> blockInfoSet
- = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
- private final SortedSet<BlockScanInfo> newBlockInfoSet =
- new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
- private final GSet<Block, BlockScanInfo> blockMap
- = new LightWeightGSet<Block, BlockScanInfo>(
- LightWeightGSet.computeCapacity(0.5, "BlockMap"));
-
- // processedBlocks keeps track of which blocks are scanned
- // since the last run.
- private volatile HashMap<Long, Integer> processedBlocks;
-
- private long totalScans = 0;
- private long totalScanErrors = 0;
- private long totalTransientErrors = 0;
- private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
-
- private long currentPeriodStart = Time.monotonicNow();
- private long bytesLeft = 0; // Bytes to scan in this period
- private long totalBytesToScan = 0;
- private boolean isNewPeriod = true;
- private int lastScanTimeDifference = 5*60*1000;
-
- private final LogFileHandler verificationLog;
-
- private final DataTransferThrottler throttler = new DataTransferThrottler(
- 200, MAX_SCAN_RATE);
-
- private static enum ScanType {
- IMMEDIATE_SCAN,
- VERIFICATION_SCAN, // scanned as part of periodic verfication
- NONE,
- }
-
- // Extend Block because in the DN process there's a 1-to-1 correspondence of
- // BlockScanInfo to Block instances, so by extending rather than containing
- // Block, we can save a bit of Object overhead (about 24 bytes per block
- // replica.)
- static class BlockScanInfo extends Block
- implements LightWeightGSet.LinkedElement {
-
- /** Compare the info by the last scan time. */
- static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR
- = new Comparator<BlockPoolSliceScanner.BlockScanInfo>() {
-
- @Override
- public int compare(BlockScanInfo left, BlockScanInfo right) {
- final ScanType leftNextScanType = left.nextScanType;
- final ScanType rightNextScanType = right.nextScanType;
- final long l = left.lastScanTime;
- final long r = right.lastScanTime;
- // Compare by nextScanType if they are same then compare by
- // lastScanTimes
- // compare blocks itself if scantimes are same to avoid.
- // because TreeMap uses comparator if available to check existence of
- // the object.
- int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
- return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1: l < r? -1: l > r? 1: left.compareTo(right);
- }
- };
-
- long lastScanTime = 0;
- ScanType lastScanType = ScanType.NONE;
- boolean lastScanOk = true;
- private LinkedElement next;
- ScanType nextScanType = ScanType.VERIFICATION_SCAN;
-
- BlockScanInfo(Block block) {
- super(block);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override
- public boolean equals(Object that) {
- if (this == that) {
- return true;
- }
- return super.equals(that);
- }
-
- long getLastScanTime() {
- return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
- }
-
- @Override
- public void setNext(LinkedElement next) {
- this.next = next;
- }
-
- @Override
- public LinkedElement getNext() {
- return next;
- }
- }
-
- BlockPoolSliceScanner(String bpid, DataNode datanode,
- FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) {
- this.datanode = datanode;
- this.dataset = dataset;
- this.blockPoolId = bpid;
-
- long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
- DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT);
- if (hours <= 0) {
- hours = DEFAULT_SCAN_PERIOD_HOURS;
- }
- this.scanPeriod = hours * 3600 * 1000;
- LOG.info("Periodic Block Verification Scanner initialized with interval "
- + hours + " hours for block pool " + bpid);
-
- // get the list of blocks and arrange them in random order
- List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
- Collections.shuffle(arr);
-
- long scanTime = -1;
- for (Block block : arr) {
- BlockScanInfo info = new BlockScanInfo( block );
- info.lastScanTime = scanTime--;
- //still keep 'info.lastScanType' to NONE.
- addBlockInfo(info, false);
- }
-
- RollingLogs rollingLogs = null;
- try {
- rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX);
- } catch (IOException e) {
- LOG.warn("Could not open verfication log. " +
- "Verification times are not stored.");
- }
- verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs);
- }
-
- String getBlockPoolId() {
- return blockPoolId;
- }
-
- private void updateBytesToScan(long len, long lastScanTime) {
- // len could be negative when a block is deleted.
- totalBytesToScan += len;
- if ( lastScanTime < currentPeriodStart ) {
- bytesLeft += len;
- }
- // Should we change throttler bandwidth every time bytesLeft changes?
- // not really required.
- }
-
- /**
- * Add the BlockScanInfo to sorted set of blockScanInfo
- * @param info BlockScanInfo to be added
- * @param isNewBlock true if the block is the new Block, false if
- * BlockScanInfo is being updated with new scanTime
- */
- private synchronized void addBlockInfo(BlockScanInfo info,
- boolean isNewBlock) {
- boolean added = false;
- if (isNewBlock) {
- // check whether the block already present
- boolean exists = blockInfoSet.contains(info);
- added = !exists && newBlockInfoSet.add(info);
- } else {
- added = blockInfoSet.add(info);
- }
- blockMap.put(info);
-
- if (added) {
- updateBytesToScan(info.getNumBytes(), info.lastScanTime);
- }
- }
-
- private synchronized void delBlockInfo(BlockScanInfo info) {
- boolean exists = blockInfoSet.remove(info);
- if (!exists){
- exists = newBlockInfoSet.remove(info);
- }
- blockMap.remove(info);
-
- if (exists) {
- updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
- }
- }
-
- /** Update blockMap by the given LogEntry */
- private synchronized void updateBlockInfo(LogEntry e) {
- BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
-
- if (info != null && e.verificationTime > 0 &&
- info.lastScanTime < e.verificationTime) {
- delBlockInfo(info);
- if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
- info.lastScanTime = e.verificationTime;
- }
- info.lastScanType = ScanType.VERIFICATION_SCAN;
- addBlockInfo(info, false);
- }
- }
-
- private synchronized long getNewBlockScanTime() {
- /* If there are a lot of blocks, this returns a random time with in
- * the scan period. Otherwise something sooner.
- */
- long period = Math.min(scanPeriod,
- Math.max(blockMap.size(),1) * 600 * 1000L);
- int periodInt = Math.abs((int)period);
- return Time.monotonicNow() - scanPeriod +
- DFSUtil.getRandom().nextInt(periodInt);
- }
-
- /** Adds block to list of blocks
- * @param scanNow - true if we want to make that particular block a high
- * priority one to scan immediately
- **/
- synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
- BlockScanInfo info = blockMap.get(block.getLocalBlock());
- long lastScanTime = 0;
- if (info != null) {
- lastScanTime = info.lastScanTime;
- }
- // If the particular block is scanned in last 5 minutes, the no need to
- // verify that block again
- if (scanNow && Time.monotonicNow() - lastScanTime <
- lastScanTimeDifference) {
- return;
- }
-
- if ( info != null ) {
- LOG.warn("Adding an already existing block " + block);
- delBlockInfo(info);
- }
-
- info = new BlockScanInfo(block.getLocalBlock());
- info.lastScanTime = getNewBlockScanTime();
- if (scanNow) {
- // Create a new BlockScanInfo object and set the lastScanTime to 0
- // which will make it the high priority block
- LOG.info("Adding block for immediate verification " + block);
- info.nextScanType = ScanType.IMMEDIATE_SCAN;
- }
-
- addBlockInfo(info, true);
- adjustThrottler();
- }
-
- /** Deletes the block from internal structures */
- synchronized void deleteBlock(Block block) {
- BlockScanInfo info = blockMap.get(block);
- if (info != null) {
- delBlockInfo(info);
- }
- }
-
- @VisibleForTesting
- long getTotalScans() {
- return totalScans;
- }
-
- /** @return the last scan time for the block pool. */
- long getLastScanTime() {
- return lastScanTime.get();
- }
-
- /** @return the last scan time the given block. */
- synchronized long getLastScanTime(Block block) {
- BlockScanInfo info = blockMap.get(block);
- return info == null? 0: info.lastScanTime;
- }
-
- /** Deletes blocks from internal structures */
- void deleteBlocks(Block[] blocks) {
- for ( Block b : blocks ) {
- deleteBlock(b);
- }
- }
-
- private synchronized void updateScanStatus(BlockScanInfo info,
- ScanType type,
- boolean scanOk) {
- delBlockInfo(info);
-
- long now = Time.monotonicNow();
- info.lastScanType = type;
- info.lastScanTime = now;
- info.lastScanOk = scanOk;
- info.nextScanType = ScanType.VERIFICATION_SCAN;
- addBlockInfo(info, false);
-
- // Don't update meta data if the verification failed.
- if (!scanOk) {
- return;
- }
-
- if (verificationLog != null) {
- verificationLog.append(now, info.getGenerationStamp(),
- info.getBlockId());
- }
- }
-
- private void handleScanFailure(ExtendedBlock block) {
- LOG.info("Reporting bad " + block);
- try {
- datanode.reportBadBlocks(block);
- } catch (IOException ie) {
- // it is bad, but not bad enough to shutdown the scanner
- LOG.warn("Cannot report bad " + block.getBlockId());
- }
- }
-
- @VisibleForTesting
- synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
- this.lastScanTimeDifference = lastScanTimeDifference;
- }
-
- static private class LogEntry {
-
- long blockId = -1;
- long verificationTime = -1;
- long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
-
- /**
- * The format consists of single line with multiple entries. each
- * entry is in the form : name="value".
- * This simple text and easily extendable and easily parseable with a
- * regex.
- */
- private static final Pattern entryPattern =
- Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
-
- static String toString(long verificationTime, long genStamp, long blockId,
- DateFormat dateFormat) {
- return "\ndate=\"" + dateFormat.format(new Date(verificationTime))
- + "\"\t time=\"" + verificationTime
- + "\"\t genstamp=\"" + genStamp
- + "\"\t id=\"" + blockId + "\"";
- }
-
- static LogEntry parseEntry(String line) {
- LogEntry entry = new LogEntry();
-
- Matcher matcher = entryPattern.matcher(line);
- while (matcher.find()) {
- String name = matcher.group(1);
- String value = matcher.group(2);
-
- try {
- if (name.equals("id")) {
- entry.blockId = Long.parseLong(value);
- } else if (name.equals("time")) {
- entry.verificationTime = Long.parseLong(value);
- } else if (name.equals("genstamp")) {
- entry.genStamp = Long.parseLong(value);
- }
- } catch(NumberFormatException nfe) {
- LOG.warn("Cannot parse line: " + line, nfe);
- return null;
- }
- }
-
- return entry;
- }
- }
-
- private synchronized void adjustThrottler() {
- long timeLeft = Math.max(1L,
- currentPeriodStart + scanPeriod - Time.monotonicNow());
- long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE);
- throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
- }
-
- @VisibleForTesting
- void verifyBlock(ExtendedBlock block) {
- BlockSender blockSender = null;
-
- /* In case of failure, attempt to read second time to reduce
- * transient errors. How do we flush block data from kernel
- * buffers before the second read?
- */
- for (int i=0; i<2; i++) {
- boolean second = (i > 0);
-
- try {
- adjustThrottler();
-
- blockSender = new BlockSender(block, 0, -1, false, true, true,
- datanode, null, CachingStrategy.newDropBehind());
-
- DataOutputStream out =
- new DataOutputStream(new IOUtils.NullOutputStream());
-
- blockSender.sendBlock(out, null, throttler);
-
- LOG.info((second ? "Second " : "") +
- "Verification succeeded for " + block);
-
- if ( second ) {
- totalTransientErrors++;
- }
-
- updateScanStatus((BlockScanInfo)block.getLocalBlock(),
- ScanType.VERIFICATION_SCAN, true);
-
- return;
- } catch (IOException e) {
- updateScanStatus((BlockScanInfo)block.getLocalBlock(),
- ScanType.VERIFICATION_SCAN, false);
-
- // If the block does not exists anymore, then its not an error
- if (!dataset.contains(block)) {
- LOG.info(block + " is no longer in the dataset");
- deleteBlock(block.getLocalBlock());
- return;
- }
-
- // If the block exists, the exception may due to a race with write:
- // The BlockSender got an old block path in rbw. BlockReceiver removed
- // the rbw block from rbw to finalized but BlockSender tried to open the
- // file before BlockReceiver updated the VolumeMap. The state of the
- // block can be changed again now, so ignore this error here. If there
- // is a block really deleted by mistake, DirectoryScan should catch it.
- if (e instanceof FileNotFoundException ) {
- LOG.info("Verification failed for " + block +
- " - may be due to race with write");
- deleteBlock(block.getLocalBlock());
- return;
- }
-
- LOG.warn((second ? "Second " : "First ") + "Verification failed for "
- + block, e);
-
- if (second) {
- totalScanErrors++;
- datanode.getMetrics().incrBlockVerificationFailures();
- handleScanFailure(block);
- return;
- }
- } finally {
- IOUtils.closeStream(blockSender);
- datanode.getMetrics().incrBlocksVerified();
- totalScans++;
- }
- }
- }
-
- private synchronized long getEarliestScanTime() {
- if (!blockInfoSet.isEmpty()) {
- return blockInfoSet.first().lastScanTime;
- }
- return Long.MAX_VALUE;
- }
-
- private synchronized boolean isFirstBlockProcessed() {
- if (!blockInfoSet.isEmpty()) {
- if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
- return false;
- }
- long blockId = blockInfoSet.first().getBlockId();
- if ((processedBlocks.get(blockId) != null)
- && (processedBlocks.get(blockId) == 1)) {
- return true;
- }
- }
- return false;
- }
-
- // Picks one block and verifies it
- private void verifyFirstBlock() {
- BlockScanInfo block = null;
- synchronized (this) {
- if (!blockInfoSet.isEmpty()) {
- block = blockInfoSet.first();
- }
- }
- if ( block != null ) {
- verifyBlock(new ExtendedBlock(blockPoolId, block));
- processedBlocks.put(block.getBlockId(), 1);
- }
- }
-
- // Used for tests only
- int getBlocksScannedInLastRun() {
- return totalBlocksScannedInLastRun.get();
- }
-
- /**
- * Reads the current and previous log files (if any) and marks the blocks
- * processed if they were processed within last scan period. Copies the log
- * records of recently scanned blocks from previous to current file.
- * Returns false if the process was interrupted because the thread is marked
- * to exit.
- */
- private boolean assignInitialVerificationTimes() {
- //First updates the last verification times from the log file.
- if (verificationLog != null) {
- long now = Time.monotonicNow();
- RollingLogs.LineIterator logIterator = null;
- try {
- logIterator = verificationLog.logs.iterator(false);
- // update verification times from the verificationLog.
- while (logIterator.hasNext()) {
- if (!datanode.shouldRun
- || datanode.blockScanner.blockScannerThread.isInterrupted()) {
- return false;
- }
- LogEntry entry = LogEntry.parseEntry(logIterator.next());
- if (entry != null) {
- updateBlockInfo(entry);
- if (now - entry.verificationTime < scanPeriod) {
- BlockScanInfo info = blockMap.get(new Block(entry.blockId, 0,
- entry.genStamp));
- if (info != null) {
- if (processedBlocks.get(entry.blockId) == null) {
- if (isNewPeriod) {
- updateBytesLeft(-info.getNumBytes());
- }
- processedBlocks.put(entry.blockId, 1);
- }
- if (logIterator.isLastReadFromPrevious()) {
- // write the log entry to current file
- // so that the entry is preserved for later runs.
- verificationLog.append(entry.verificationTime, entry.genStamp,
- entry.blockId);
- }
- }
- }
- }
- }
- } catch (IOException e) {
- LOG.warn("Failed to read previous verification times.", e);
- } finally {
- IOUtils.closeStream(logIterator);
- }
- isNewPeriod = false;
- }
-
-
- /* Before this loop, entries in blockInfoSet that are not
- * updated above have lastScanTime of <= 0 . Loop until first entry has
- * lastModificationTime > 0.
- */
- synchronized (this) {
- final int numBlocks = Math.max(blockMap.size(), 1);
- // Initially spread the block reads over half of scan period
- // so that we don't keep scanning the blocks too quickly when restarted.
- long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
- long lastScanTime = Time.monotonicNow() - scanPeriod;
-
- if (!blockInfoSet.isEmpty()) {
- BlockScanInfo info;
- while ((info = blockInfoSet.first()).lastScanTime < 0) {
- delBlockInfo(info);
- info.lastScanTime = lastScanTime;
- lastScanTime += verifyInterval;
- addBlockInfo(info, false);
- }
- }
- }
-
- return true;
- }
-
- private synchronized void updateBytesLeft(long len) {
- bytesLeft += len;
- }
-
- private synchronized void startNewPeriod() {
- LOG.info("Starting a new period : work left in prev period : "
- + String.format("%.2f%%", totalBytesToScan == 0 ? 0
- : (bytesLeft * 100.0) / totalBytesToScan));
-
- // reset the byte counts :
- bytesLeft = totalBytesToScan;
- currentPeriodStart = Time.monotonicNow();
- isNewPeriod = true;
- }
-
- private synchronized boolean workRemainingInCurrentPeriod() {
- if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
- currentPeriodStart + ", period=" + scanPeriod + ", now=" +
- Time.monotonicNow() + " " + blockPoolId);
- }
- return false;
- } else {
- return true;
- }
- }
-
- void scanBlockPoolSlice() {
- if (!workRemainingInCurrentPeriod()) {
- return;
- }
-
- // Create a new processedBlocks structure
- processedBlocks = new HashMap<Long, Integer>();
- if (!assignInitialVerificationTimes()) {
- return;
- }
- // Start scanning
- try {
- scan();
- } finally {
- totalBlocksScannedInLastRun.set(processedBlocks.size());
- lastScanTime.set(Time.monotonicNow());
- }
- }
-
- /**
- * Shuts down this BlockPoolSliceScanner and releases any internal resources.
- */
- void shutdown() {
- if (verificationLog != null) {
- verificationLog.close();
- }
- }
-
- private void scan() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting to scan blockpool: " + blockPoolId);
- }
- try {
- adjustThrottler();
-
- while (datanode.shouldRun
- && !datanode.blockScanner.blockScannerThread.isInterrupted()
- && datanode.isBPServiceAlive(blockPoolId)) {
- long now = Time.monotonicNow();
- synchronized (this) {
- if ( now >= (currentPeriodStart + scanPeriod)) {
- startNewPeriod();
- }
- }
- if (((now - getEarliestScanTime()) >= scanPeriod)
- || ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) {
- verifyFirstBlock();
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("All remaining blocks were processed recently, "
- + "so this run is complete");
- }
- break;
- }
- }
- } catch (RuntimeException e) {
- LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
- throw e;
- } finally {
- rollVerificationLogs();
- rollNewBlocksInfo();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Done scanning block pool: " + blockPoolId);
- }
- }
- }
-
- // add new blocks to scan in next iteration
- private synchronized void rollNewBlocksInfo() {
- for (BlockScanInfo newBlock : newBlockInfoSet) {
- blockInfoSet.add(newBlock);
- }
- newBlockInfoSet.clear();
- }
-
- private synchronized void rollVerificationLogs() {
- if (verificationLog != null) {
- try {
- verificationLog.logs.roll();
- } catch (IOException ex) {
- LOG.warn("Received exception: ", ex);
- verificationLog.close();
- }
- }
- }
-
-
- synchronized void printBlockReport(StringBuilder buffer,
- boolean summaryOnly) {
- long oneHour = 3600*1000;
- long oneDay = 24*oneHour;
- long oneWeek = 7*oneDay;
- long fourWeeks = 4*oneWeek;
-
- int inOneHour = 0;
- int inOneDay = 0;
- int inOneWeek = 0;
- int inFourWeeks = 0;
- int inScanPeriod = 0;
- int neverScanned = 0;
-
- DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-
- int total = blockInfoSet.size();
-
- long now = Time.monotonicNow();
-
- Date date = new Date();
-
- for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
- BlockScanInfo info = it.next();
-
- long scanTime = info.getLastScanTime();
- long diff = now - scanTime;
-
- if (diff <= oneHour) inOneHour++;
- if (diff <= oneDay) inOneDay++;
- if (diff <= oneWeek) inOneWeek++;
- if (diff <= fourWeeks) inFourWeeks++;
- if (diff <= scanPeriod) inScanPeriod++;
- if (scanTime <= 0) neverScanned++;
-
- if (!summaryOnly) {
- date.setTime(scanTime);
- String scanType =
- (info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none";
- buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
- " scan time : " +
- "%-15d %s%n", info,
- (info.lastScanOk ? "ok" : "failed"),
- scanType, scanTime,
- (scanTime <= 0) ? "not yet verified" :
- dateFormat.format(date)));
- }
- }
-
- double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
- *100.0/scanPeriod;
- double pctProgress = (totalBytesToScan == 0) ? 100 :
- (totalBytesToScan-bytesLeft)*100.0/totalBytesToScan;
-
- buffer.append(String.format("%nTotal Blocks : %6d" +
- "%nVerified in last hour : %6d" +
- "%nVerified in last day : %6d" +
- "%nVerified in last week : %6d" +
- "%nVerified in last four weeks : %6d" +
- "%nVerified in SCAN_PERIOD : %6d" +
- "%nNot yet verified : %6d" +
- "%nVerified since restart : %6d" +
- "%nScans since restart : %6d" +
- "%nScan errors since restart : %6d" +
- "%nTransient scan errors : %6d" +
- "%nCurrent scan rate limit KBps : %6d" +
- "%nProgress this period : %6.0f%%" +
- "%nTime left in cur period : %6.2f%%" +
- "%n",
- total, inOneHour, inOneDay, inOneWeek,
- inFourWeeks, inScanPeriod, neverScanned,
- totalScans, totalScans,
- totalScanErrors, totalTransientErrors,
- Math.round(throttler.getBandwidth()/1024.0),
- pctProgress, pctPeriodLeft));
- }
-
- /**
- * This class takes care of log file used to store the last verification
- * times of the blocks.
- */
- private static class LogFileHandler {
- private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-
- private final RollingLogs logs;
-
- private LogFileHandler(RollingLogs logs) {
- this.logs = logs;
- }
-
- void append(long verificationTime, long genStamp, long blockId) {
- final String m = LogEntry.toString(verificationTime, genStamp, blockId,
- dateFormat);
- try {
- logs.appender().append(m);
- } catch (IOException e) {
- LOG.warn("Failed to append to " + logs + ", m=" + m, e);
- }
- }
-
- void close() {
- try {
- logs.appender().close();
- } catch (IOException e) {
- LOG.warn("Failed to close the appender of " + logs, e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index 3d6c66e..368d80d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -198,20 +198,12 @@ class BlockReceiver implements Closeable {
break;
case PIPELINE_SETUP_APPEND:
replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
- if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
- block.getLocalBlock());
- }
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
- if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
- block.getLocalBlock());
- }
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
new file mode 100644
index 0000000..7429fff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@InterfaceAudience.Private
+public class BlockScanner {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockScanner.class);
+
+ /**
+ * The DataNode that this scanner is associated with.
+ */
+ private final DataNode datanode;
+
+ /**
+ * Maps Storage IDs to VolumeScanner objects.
+ */
+ private final TreeMap<String, VolumeScanner> scanners =
+ new TreeMap<String, VolumeScanner>();
+
+ /**
+ * The scanner configuration.
+ */
+ private final Conf conf;
+
+ /**
+ * The cached scanner configuration.
+ */
+ static class Conf {
+ // These are a few internal configuration keys used for unit tests.
+ // They can't be set unless the static boolean allowUnitTestSettings has
+ // been set to true.
+
+ @VisibleForTesting
+ static final String INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS =
+ "internal.dfs.datanode.scan.period.ms.key";
+
+ @VisibleForTesting
+ static final String INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER =
+ "internal.volume.scanner.scan.result.handler";
+
+ @VisibleForTesting
+ static final String INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS =
+ "internal.dfs.block.scanner.max_staleness.ms";
+
+ @VisibleForTesting
+ static final long INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT =
+ TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
+
+ @VisibleForTesting
+ static final String INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS =
+ "dfs.block.scanner.cursor.save.interval.ms";
+
+ @VisibleForTesting
+ static final long
+ INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT =
+ TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+ static boolean allowUnitTestSettings = false;
+ final long targetBytesPerSec;
+ final long maxStalenessMs;
+ final long scanPeriodMs;
+ final long cursorSaveMs;
+ final Class<? extends ScanResultHandler> resultHandler;
+
+ private static long getUnitTestLong(Configuration conf, String key,
+ long defVal) {
+ if (allowUnitTestSettings) {
+ return conf.getLong(key, defVal);
+ } else {
+ return defVal;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ Conf(Configuration conf) {
+ this.targetBytesPerSec = Math.max(0L, conf.getLong(
+ DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
+ DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT));
+ this.maxStalenessMs = Math.max(0L, getUnitTestLong(conf,
+ INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS,
+ INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT));
+ this.scanPeriodMs = Math.max(0L,
+ getUnitTestLong(conf, INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS,
+ TimeUnit.MILLISECONDS.convert(conf.getLong(
+ DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
+ DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT), TimeUnit.HOURS)));
+ this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf,
+ INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS,
+ INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT));
+ if (allowUnitTestSettings) {
+ this.resultHandler = (Class<? extends ScanResultHandler>)
+ conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ ScanResultHandler.class);
+ } else {
+ this.resultHandler = ScanResultHandler.class;
+ }
+ }
+ }
+
+ public BlockScanner(DataNode datanode, Configuration conf) {
+ this.datanode = datanode;
+ this.conf = new Conf(conf);
+ if (isEnabled()) {
+ LOG.info("Initialized block scanner with targetBytesPerSec {}",
+ this.conf.targetBytesPerSec);
+ } else {
+ LOG.info("Disabled block scanner.");
+ }
+ }
+
+ /**
+ * Returns true if the block scanner is enabled.<p/>
+ *
+ * If the block scanner is disabled, no volume scanners will be created, and
+ * no threads will start.
+ */
+ public boolean isEnabled() {
+ return (conf.scanPeriodMs) > 0 && (conf.targetBytesPerSec > 0);
+ }
+
+ /**
+ * Set up a scanner for the given block pool and volume.
+ *
+ * @param ref A reference to the volume.
+ */
+ public synchronized void addVolumeScanner(FsVolumeReference ref) {
+ boolean success = false;
+ try {
+ FsVolumeSpi volume = ref.getVolume();
+ if (!isEnabled()) {
+ LOG.debug("Not adding volume scanner for {}, because the block " +
+ "scanner is disabled.", volume.getBasePath());
+ return;
+ }
+ VolumeScanner scanner = scanners.get(volume.getStorageID());
+ if (scanner != null) {
+ LOG.error("Already have a scanner for volume {}.",
+ volume.getBasePath());
+ return;
+ }
+ LOG.debug("Adding scanner for volume {} (StorageID {})",
+ volume.getBasePath(), volume.getStorageID());
+ scanner = new VolumeScanner(conf, datanode, ref);
+ scanner.start();
+ scanners.put(volume.getStorageID(), scanner);
+ success = true;
+ } finally {
+ if (!success) {
+ // If we didn't create a new VolumeScanner object, we don't
+ // need this reference to the volume.
+ IOUtils.cleanup(null, ref);
+ }
+ }
+ }
+
+ /**
+ * Stops and removes a volume scanner.<p/>
+ *
+ * This function will block until the volume scanner has stopped.
+ *
+ * @param volume The volume to remove.
+ */
+ public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
+ if (!isEnabled()) {
+ LOG.debug("Not removing volume scanner for {}, because the block " +
+ "scanner is disabled.", volume.getStorageID());
+ return;
+ }
+ VolumeScanner scanner = scanners.get(volume.getStorageID());
+ if (scanner == null) {
+ LOG.warn("No scanner found to remove for volumeId {}",
+ volume.getStorageID());
+ return;
+ }
+ LOG.info("Removing scanner for volume {} (StorageID {})",
+ volume.getBasePath(), volume.getStorageID());
+ scanner.shutdown();
+ scanners.remove(volume.getStorageID());
+ Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Stops and removes all volume scanners.<p/>
+ *
+ * This function will block until all the volume scanners have stopped.
+ */
+ public synchronized void removeAllVolumeScanners() {
+ for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+ entry.getValue().shutdown();
+ }
+ for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+ Uninterruptibles.joinUninterruptibly(entry.getValue(),
+ 5, TimeUnit.MINUTES);
+ }
+ scanners.clear();
+ }
+
+ /**
+ * Enable scanning a given block pool id.
+ *
+ * @param bpid The block pool id to enable scanning for.
+ */
+ synchronized void enableBlockPoolId(String bpid) {
+ Preconditions.checkNotNull(bpid);
+ for (VolumeScanner scanner : scanners.values()) {
+ scanner.enableBlockPoolId(bpid);
+ }
+ }
+
+ /**
+ * Disable scanning a given block pool id.
+ *
+ * @param bpid The block pool id to disable scanning for.
+ */
+ synchronized void disableBlockPoolId(String bpid) {
+ Preconditions.checkNotNull(bpid);
+ for (VolumeScanner scanner : scanners.values()) {
+ scanner.disableBlockPoolId(bpid);
+ }
+ }
+
+ @VisibleForTesting
+ synchronized VolumeScanner.Statistics getVolumeStats(String volumeId) {
+ VolumeScanner scanner = scanners.get(volumeId);
+ if (scanner == null) {
+ return null;
+ }
+ return scanner.getStatistics();
+ }
+
+ synchronized void printStats(StringBuilder p) {
+ // print out all bpids that we're scanning ?
+ for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+ entry.getValue().printStats(p);
+ }
+ }
+
+ @InterfaceAudience.Private
+ public static class Servlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void doGet(HttpServletRequest request,
+ HttpServletResponse response) throws IOException {
+ response.setContentType("text/plain");
+
+ DataNode datanode = (DataNode)
+ getServletContext().getAttribute("datanode");
+ BlockScanner blockScanner = datanode.getBlockScanner();
+
+ StringBuilder buffer = new StringBuilder(8 * 1024);
+ if (!blockScanner.isEnabled()) {
+ LOG.warn("Periodic block scanner is not running");
+ buffer.append("Periodic block scanner is not running. " +
+ "Please check the datanode log if this is unexpected.");
+ } else {
+ buffer.append("Block Scanner Statistics\n\n");
+ blockScanner.printStats(buffer);
+ }
+ String resp = buffer.toString();
+ LOG.trace("Returned Servlet info {}", resp);
+ response.getWriter().write(resp);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index c941e82..c016e62 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -600,9 +600,6 @@ class BlockSender implements java.io.Closeable {
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
- //Something might be wrong with the block. Make this block the high
- //priority block for verification.
- datanode.blockScanner.addBlock(block, true);
}
}
throw ioeToSocketException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
deleted file mode 100644
index 450c2b1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * DataBlockScanner manages block scanning for all the block pools. For each
- * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
- * thread to scan the blocks for that block pool. When a {@link BPOfferService}
- * becomes alive or dies, blockPoolScannerMap in this class is updated.
- */
-@InterfaceAudience.Private
-public class DataBlockScanner implements Runnable {
- public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
- private final DataNode datanode;
- private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
- private final Configuration conf;
-
- static final int SLEEP_PERIOD_MS = 5 * 1000;
-
- /**
- * Map to find the BlockPoolScanner for a given block pool id. This is updated
- * when a BPOfferService becomes alive or dies.
- */
- private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap =
- new TreeMap<String, BlockPoolSliceScanner>();
- Thread blockScannerThread = null;
-
- DataBlockScanner(DataNode datanode,
- FsDatasetSpi<? extends FsVolumeSpi> dataset,
- Configuration conf) {
- this.datanode = datanode;
- this.dataset = dataset;
- this.conf = conf;
- }
-
- @Override
- public void run() {
- String currentBpId = "";
- boolean firstRun = true;
- while (datanode.shouldRun && !Thread.interrupted()) {
- //Sleep everytime except in the first iteration.
- if (!firstRun) {
- try {
- Thread.sleep(SLEEP_PERIOD_MS);
- } catch (InterruptedException ex) {
- // Interrupt itself again to set the interrupt status
- blockScannerThread.interrupt();
- continue;
- }
- } else {
- firstRun = false;
- }
-
- BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
- if (bpScanner == null) {
- // Possible if thread is interrupted
- continue;
- }
- currentBpId = bpScanner.getBlockPoolId();
- // If BPOfferService for this pool is not alive, don't process it
- if (!datanode.isBPServiceAlive(currentBpId)) {
- LOG.warn("Block Pool " + currentBpId + " is not alive");
- // Remove in case BP service died abruptly without proper shutdown
- removeBlockPool(currentBpId);
- continue;
- }
- bpScanner.scanBlockPoolSlice();
- }
-
- // Call shutdown for each allocated BlockPoolSliceScanner.
- for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {
- bpss.shutdown();
- }
- }
-
- // Wait for at least one block pool to be up
- private void waitForInit() {
- while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
- || (getBlockPoolSetSize() < 1)) {
- try {
- Thread.sleep(SLEEP_PERIOD_MS);
- } catch (InterruptedException e) {
- blockScannerThread.interrupt();
- return;
- }
- }
- }
-
- /**
- * Find next block pool id to scan. There should be only one current
- * verification log file. Find which block pool contains the current
- * verification log file and that is used as the starting block pool id. If no
- * current files are found start with first block-pool in the blockPoolSet.
- * However, if more than one current files are found, the one with latest
- * modification time is used to find the next block pool id.
- */
- private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
-
- String nextBpId = null;
- while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
- waitForInit();
- synchronized (this) {
- if (getBlockPoolSetSize() > 0) {
- // Find nextBpId by the minimum of the last scan time
- long lastScanTime = 0;
- for (String bpid : blockPoolScannerMap.keySet()) {
- final long t = getBPScanner(bpid).getLastScanTime();
- if (t != 0L) {
- if (bpid == null || t < lastScanTime) {
- lastScanTime = t;
- nextBpId = bpid;
- }
- }
- }
-
- // nextBpId can still be null if no current log is found,
- // find nextBpId sequentially.
- if (nextBpId == null) {
- nextBpId = blockPoolScannerMap.higherKey(currentBpId);
- if (nextBpId == null) {
- nextBpId = blockPoolScannerMap.firstKey();
- }
- }
- if (nextBpId != null) {
- return getBPScanner(nextBpId);
- }
- }
- }
- LOG.warn("No block pool is up, going to wait");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ex) {
- LOG.warn("Received exception: " + ex);
- blockScannerThread.interrupt();
- return null;
- }
- }
- return null;
- }
-
- private synchronized int getBlockPoolSetSize() {
- return blockPoolScannerMap.size();
- }
-
- @VisibleForTesting
- synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
- return blockPoolScannerMap.get(bpid);
- }
-
- private synchronized String[] getBpIdList() {
- return blockPoolScannerMap.keySet().toArray(
- new String[blockPoolScannerMap.keySet().size()]);
- }
-
- public void addBlock(ExtendedBlock block, boolean scanNow) {
- BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
- if (bpScanner != null) {
- bpScanner.addBlock(block, scanNow);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + block.getBlockPoolId());
- }
- }
-
- boolean isInitialized(String bpid) {
- return getBPScanner(bpid) != null;
- }
-
- public synchronized void printBlockReport(StringBuilder buffer,
- boolean summary) {
- String[] bpIdList = getBpIdList();
- if (bpIdList == null || bpIdList.length == 0) {
- buffer.append("Periodic block scanner is not yet initialized. "
- + "Please check back again after some time.");
- return;
- }
- for (String bpid : bpIdList) {
- BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
- buffer.append("\n\nBlock report for block pool: "+bpid + "\n");
- bpScanner.printBlockReport(buffer, summary);
- buffer.append("\n");
- }
- }
-
- public void deleteBlock(String poolId, Block toDelete) {
- BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
- if (bpScanner != null) {
- bpScanner.deleteBlock(toDelete);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + poolId);
- }
- }
-
- public void deleteBlocks(String poolId, Block[] toDelete) {
- BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
- if (bpScanner != null) {
- bpScanner.deleteBlocks(toDelete);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + poolId);
- }
- }
-
- public void shutdown() {
- synchronized (this) {
- if (blockScannerThread != null) {
- blockScannerThread.interrupt();
- }
- }
-
- // We cannot join within the synchronized block, because it would create a
- // deadlock situation. blockScannerThread calls other synchronized methods.
- if (blockScannerThread != null) {
- try {
- blockScannerThread.join();
- } catch (InterruptedException e) {
- // shutting down anyway
- }
- }
- }
-
- public synchronized void addBlockPool(String blockPoolId) {
- if (blockPoolScannerMap.get(blockPoolId) != null) {
- return;
- }
- BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
- datanode, dataset, conf);
- blockPoolScannerMap.put(blockPoolId, bpScanner);
- LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
- + blockPoolScannerMap.size());
- }
-
- public synchronized void removeBlockPool(String blockPoolId) {
- BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);
- if (bpss != null) {
- bpss.shutdown();
- }
- LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
- }
-
- @VisibleForTesting
- long getBlocksScannedInLastRun(String bpid) throws IOException {
- BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
- if (bpScanner == null) {
- throw new IOException("Block Pool: "+bpid+" is not running");
- } else {
- return bpScanner.getBlocksScannedInLastRun();
- }
- }
-
- @VisibleForTesting
- long getTotalScans(String bpid) throws IOException {
- BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
- if (bpScanner == null) {
- throw new IOException("Block Pool: "+bpid+" is not running");
- } else {
- return bpScanner.getTotalScans();
- }
- }
-
- @VisibleForTesting
- public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) {
- BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
- if (bpScanner != null) {
- bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + block.getBlockPoolId());
- }
- }
-
- public void start() {
- blockScannerThread = new Thread(this);
- blockScannerThread.setDaemon(true);
- blockScannerThread.start();
- }
-
- @InterfaceAudience.Private
- public static class Servlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void doGet(HttpServletRequest request,
- HttpServletResponse response) throws IOException {
- response.setContentType("text/plain");
-
- DataNode datanode = (DataNode) getServletContext().getAttribute("datanode");
- DataBlockScanner blockScanner = datanode.blockScanner;
-
- boolean summary = (request.getParameter("listblocks") == null);
-
- StringBuilder buffer = new StringBuilder(8*1024);
- if (blockScanner == null) {
- LOG.warn("Periodic block scanner is not running");
- buffer.append("Periodic block scanner is not running. " +
- "Please check the datanode log if this is unexpected.");
- } else {
- blockScanner.printBlockReport(buffer, summary);
- }
- response.getWriter().write(buffer.toString()); // extra copy!
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index fa0d74b..93e64d2 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -320,7 +320,7 @@ public class DataNode extends ReconfigurableBase
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private boolean hasAnyBlockPoolRegistered = false;
- volatile DataBlockScanner blockScanner = null;
+ private final BlockScanner blockScanner;
private DirectoryScanner directoryScanner = null;
/** Activated plug-ins. */
@@ -364,6 +364,7 @@ public class DataNode extends ReconfigurableBase
@InterfaceAudience.LimitedPrivate("HDFS")
DataNode(final Configuration conf) {
super(conf);
+ this.blockScanner = new BlockScanner(this, conf);
this.fileDescriptorPassingDisabledReason = null;
this.maxNumberOfBlocksToLog = 0;
this.confVersion = null;
@@ -697,7 +698,8 @@ public class DataNode extends ReconfigurableBase
this.infoServer.setAttribute("datanode", this);
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
this.infoServer.addServlet(null, "/blockScannerReport",
- DataBlockScanner.Servlet.class);
+ BlockScanner.Servlet.class);
+
this.infoServer.start();
InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
@@ -798,56 +800,12 @@ public class DataNode extends ReconfigurableBase
// Not a superuser.
throw new AccessControlException();
}
-
-/**
- * Initialize the datanode's periodic scanners:
- * {@link DataBlockScanner}
- * {@link DirectoryScanner}
- * They report results on a per-blockpool basis but do their scanning
- * on a per-Volume basis to minimize competition for disk iops.
- *
- * @param conf - Configuration has the run intervals and other
- * parameters for these periodic scanners
- */
- private void initPeriodicScanners(Configuration conf) {
- initDataBlockScanner(conf);
- initDirectoryScanner(conf);
- }
-
+
private void shutdownPeriodicScanners() {
shutdownDirectoryScanner();
- shutdownDataBlockScanner();
- }
-
- /**
- * See {@link DataBlockScanner}
- */
- private synchronized void initDataBlockScanner(Configuration conf) {
- if (blockScanner != null) {
- return;
- }
- String reason = null;
- assert data != null;
- if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
- DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
- reason = "verification is turned off by configuration";
- } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
- reason = "verifcation is not supported by SimulatedFSDataset";
- }
- if (reason == null) {
- blockScanner = new DataBlockScanner(this, data, conf);
- blockScanner.start();
- } else {
- LOG.info("Periodic Block Verification scan disabled because " + reason);
- }
+ blockScanner.removeAllVolumeScanners();
}
-
- private void shutdownDataBlockScanner() {
- if (blockScanner != null) {
- blockScanner.shutdown();
- }
- }
-
+
/**
* See {@link DirectoryScanner}
*/
@@ -1276,9 +1234,8 @@ public class DataNode extends ReconfigurableBase
// registering anywhere. If that's the case, we wouldn't have
// a block pool id
String bpId = bpos.getBlockPoolId();
- if (blockScanner != null) {
- blockScanner.removeBlockPool(bpId);
- }
+
+ blockScanner.disableBlockPoolId(bpId);
if (data != null) {
data.shutdownBlockPool(bpId);
@@ -1322,9 +1279,9 @@ public class DataNode extends ReconfigurableBase
// failures.
checkDiskError();
- initPeriodicScanners(conf);
-
+ initDirectoryScanner(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+ blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
}
BPOfferService[] getAllBpOs() {
@@ -2194,10 +2151,6 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
- FsVolumeSpi volume = getFSDataset().getVolume(block);
- if (blockScanner != null && !volume.isTransientStorage()) {
- blockScanner.addBlock(block, false);
- }
}
/** Start a single datanode daemon and wait for it to finish.
@@ -2471,8 +2424,9 @@ public class DataNode extends ReconfigurableBase
return data;
}
+ @VisibleForTesting
/** @return the block scanner. */
- public DataBlockScanner getBlockScanner() {
+ public BlockScanner getBlockScanner() {
return blockScanner;
}
[2/3] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use
O(1) memory and use multiple threads (cmccabe)
Posted by cm...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
new file mode 100644
index 0000000..781b4d3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * VolumeScanner scans a single volume. Each VolumeScanner has its own thread.<p/>
+ * They are all managed by the DataNode's BlockScanner.
+ */
+public class VolumeScanner extends Thread {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(VolumeScanner.class);
+
+ /**
+ * Number of seconds in a minute.
+ */
+ private final static int SECONDS_PER_MINUTE = 60;
+
+ /**
+ * Number of minutes in an hour.
+ */
+ private final static int MINUTES_PER_HOUR = 60;
+
+ /**
+ * Name of the block iterator used by this scanner.
+ */
+ private final static String BLOCK_ITERATOR_NAME = "scanner";
+
+ /**
+ * The configuration.
+ */
+ private final Conf conf;
+
+ /**
+ * The DataNode this VolumEscanner is associated with.
+ */
+ private final DataNode datanode;
+
+ /**
+ * A reference to the volume that we're scanning.
+ */
+ private final FsVolumeReference ref;
+
+ /**
+ * The volume that we're scanning.
+ */
+ final FsVolumeSpi volume;
+
+ /**
+ * The number of scanned bytes in each minute of the last hour.<p/>
+ *
+ * This array is managed as a circular buffer. We take the monotonic time and
+ * divide it up into one-minute periods. Each entry in the array represents
+ * how many bytes were scanned during that period.
+ */
+ private final long scannedBytes[] = new long[MINUTES_PER_HOUR];
+
+ /**
+ * The sum of all the values of scannedBytes.
+ */
+ private long scannedBytesSum = 0;
+
+ /**
+ * The throttler to use with BlockSender objects.
+ */
+ private final DataTransferThrottler throttler = new DataTransferThrottler(1);
+
+ /**
+ * The null output stream to use with BlockSender objects.
+ */
+ private final DataOutputStream nullStream =
+ new DataOutputStream(new IOUtils.NullOutputStream());
+
+ /**
+ * The block iterators associated with this VolumeScanner.<p/>
+ *
+ * Each block pool has its own BlockIterator.
+ */
+ private final List<BlockIterator> blockIters =
+ new LinkedList<BlockIterator>();
+
+ /**
+ * The current block iterator, or null if there is none.
+ */
+ private BlockIterator curBlockIter = null;
+
+ /**
+ * True if the thread is stopping.<p/>
+ * Protected by this object's lock.
+ */
+ private boolean stopping = false;
+
+ /**
+ * The current minute, in monotonic terms.
+ */
+ private long curMinute = 0;
+
+ /**
+ * Handles scan results.
+ */
+ private final ScanResultHandler resultHandler;
+
+ private final Statistics stats = new Statistics();
+
+ static class Statistics {
+ long bytesScannedInPastHour = 0;
+ long blocksScannedInCurrentPeriod = 0;
+ long blocksScannedSinceRestart = 0;
+ long scansSinceRestart = 0;
+ long scanErrorsSinceRestart = 0;
+ long nextBlockPoolScanStartMs = -1;
+ long blockPoolPeriodEndsMs = -1;
+ ExtendedBlock lastBlockScanned = null;
+ boolean eof = false;
+
+ Statistics() {
+ }
+
+ Statistics(Statistics other) {
+ this.bytesScannedInPastHour = other.bytesScannedInPastHour;
+ this.blocksScannedInCurrentPeriod = other.blocksScannedInCurrentPeriod;
+ this.blocksScannedSinceRestart = other.blocksScannedSinceRestart;
+ this.scansSinceRestart = other.scansSinceRestart;
+ this.scanErrorsSinceRestart = other.scanErrorsSinceRestart;
+ this.nextBlockPoolScanStartMs = other.nextBlockPoolScanStartMs;
+ this.blockPoolPeriodEndsMs = other.blockPoolPeriodEndsMs;
+ this.lastBlockScanned = other.lastBlockScanned;
+ this.eof = other.eof;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().
+ append("Statistics{").
+ append("bytesScannedInPastHour=").append(bytesScannedInPastHour).
+ append(", blocksScannedInCurrentPeriod=").
+ append(blocksScannedInCurrentPeriod).
+ append(", blocksScannedSinceRestart=").
+ append(blocksScannedSinceRestart).
+ append(", scansSinceRestart=").append(scansSinceRestart).
+ append(", scanErrorsSinceRestart=").append(scanErrorsSinceRestart).
+ append(", nextBlockPoolScanStartMs=").append(nextBlockPoolScanStartMs).
+ append(", blockPoolPeriodEndsMs=").append(blockPoolPeriodEndsMs).
+ append(", lastBlockScanned=").append(lastBlockScanned).
+ append(", eof=").append(eof).
+ append("}").toString();
+ }
+ }
+
+ private static double positiveMsToHours(long ms) {
+ if (ms <= 0) {
+ return 0;
+ } else {
+ return TimeUnit.HOURS.convert(ms, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void printStats(StringBuilder p) {
+ p.append("Block scanner information for volume " +
+ volume.getStorageID() + " with base path " + volume.getBasePath() +
+ "%n");
+ synchronized (stats) {
+ p.append(String.format("Bytes verified in last hour : %57d%n",
+ stats.bytesScannedInPastHour));
+ p.append(String.format("Blocks scanned in current period : %57d%n",
+ stats.blocksScannedInCurrentPeriod));
+ p.append(String.format("Blocks scanned since restart : %57d%n",
+ stats.blocksScannedSinceRestart));
+ p.append(String.format("Block pool scans since restart : %57d%n",
+ stats.scansSinceRestart));
+ p.append(String.format("Block scan errors since restart : %57d%n",
+ stats.scanErrorsSinceRestart));
+ if (stats.nextBlockPoolScanStartMs > 0) {
+ p.append(String.format("Hours until next block pool scan : %57.3f%n",
+ positiveMsToHours(stats.nextBlockPoolScanStartMs -
+ Time.monotonicNow())));
+ }
+ if (stats.blockPoolPeriodEndsMs > 0) {
+ p.append(String.format("Hours until possible pool rescan : %57.3f%n",
+ positiveMsToHours(stats.blockPoolPeriodEndsMs -
+ Time.now())));
+ }
+ p.append(String.format("Last block scanned : %57s%n",
+ ((stats.lastBlockScanned == null) ? "none" :
+ stats.lastBlockScanned.toString())));
+ p.append(String.format("More blocks to scan in period : %57s%n",
+ !stats.eof));
+ p.append("%n");
+ }
+ }
+
+ static class ScanResultHandler {
+ private VolumeScanner scanner;
+
+ public void setup(VolumeScanner scanner) {
+ LOG.trace("Starting VolumeScanner {}",
+ scanner.volume.getBasePath());
+ this.scanner = scanner;
+ }
+
+ public void handle(ExtendedBlock block, IOException e) {
+ FsVolumeSpi volume = scanner.volume;
+ if (e == null) {
+ LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
+ return;
+ }
+ // If the block does not exist anymore, then it's not an error.
+ if (!volume.getDataset().contains(block)) {
+ LOG.debug("Volume {}: block {} is no longer in the dataset.",
+ volume.getBasePath(), block);
+ return;
+ }
+ // If the block exists, the exception may due to a race with write:
+ // The BlockSender got an old block path in rbw. BlockReceiver removed
+ // the rbw block from rbw to finalized but BlockSender tried to open the
+ // file before BlockReceiver updated the VolumeMap. The state of the
+ // block can be changed again now, so ignore this error here. If there
+ // is a block really deleted by mistake, DirectoryScan should catch it.
+ if (e instanceof FileNotFoundException ) {
+ LOG.info("Volume {}: verification failed for {} because of " +
+ "FileNotFoundException. This may be due to a race with write.",
+ volume.getBasePath(), block);
+ return;
+ }
+ LOG.warn("Reporting bad {} on {}", block, volume.getBasePath());
+ try {
+ scanner.datanode.reportBadBlocks(block);
+ } catch (IOException ie) {
+ // This is bad, but not bad enough to shut down the scanner.
+ LOG.warn("Cannot report bad " + block.getBlockId(), e);
+ }
+ }
+ }
+
+ VolumeScanner(Conf conf, DataNode datanode, FsVolumeReference ref) {
+ this.conf = conf;
+ this.datanode = datanode;
+ this.ref = ref;
+ this.volume = ref.getVolume();
+ ScanResultHandler handler;
+ try {
+ handler = conf.resultHandler.newInstance();
+ } catch (Throwable e) {
+ LOG.error("unable to instantiate {}", conf.resultHandler, e);
+ handler = new ScanResultHandler();
+ }
+ this.resultHandler = handler;
+ setName("VolumeScannerThread(" + volume.getBasePath() + ")");
+ setDaemon(true);
+ }
+
+ private void saveBlockIterator(BlockIterator iter) {
+ try {
+ iter.save();
+ } catch (IOException e) {
+ LOG.warn("{}: error saving {}.", this, iter, e);
+ }
+ }
+
+ private void expireOldScannedBytesRecords(long monotonicMs) {
+ long newMinute =
+ TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS);
+ newMinute = newMinute % MINUTES_PER_HOUR;
+ if (curMinute == newMinute) {
+ return;
+ }
+ // If a minute or more has gone past since we last updated the scannedBytes
+ // array, zero out the slots corresponding to those minutes.
+ for (long m = curMinute + 1; m <= newMinute; m++) {
+ LOG.trace("{}: updateScannedBytes is zeroing out slot {}. " +
+ "curMinute = {}; newMinute = {}", this, m % MINUTES_PER_HOUR,
+ curMinute, newMinute);
+ scannedBytesSum -= scannedBytes[(int)(m % MINUTES_PER_HOUR)];
+ scannedBytes[(int)(m % MINUTES_PER_HOUR)] = 0;
+ }
+ curMinute = newMinute;
+ }
+
+ /**
+ * Find a usable block iterator.<p/>
+ *
+ * We will consider available block iterators in order. This property is
+ * important so that we don't keep rescanning the same block pool id over
+ * and over, while other block pools stay unscanned.<p/>
+ *
+ * A block pool is always ready to scan if the iterator is not at EOF. If
+ * the iterator is at EOF, the block pool will be ready to scan when
+ * conf.scanPeriodMs milliseconds have elapsed since the iterator was last
+ * rewound.<p/>
+ *
+ * @return 0 if we found a usable block iterator; the
+ * length of time we should delay before
+ * checking again otherwise.
+ */
+ private synchronized long findNextUsableBlockIter() {
+ int numBlockIters = blockIters.size();
+ if (numBlockIters == 0) {
+ LOG.debug("{}: no block pools are registered.", this);
+ return Long.MAX_VALUE;
+ }
+ int curIdx;
+ if (curBlockIter == null) {
+ curIdx = 0;
+ } else {
+ curIdx = blockIters.indexOf(curBlockIter);
+ Preconditions.checkState(curIdx >= 0);
+ }
+ // Note that this has to be wall-clock time, not monotonic time. This is
+ // because the time saved in the cursor file is a wall-clock time. We do
+ // not want to save a monotonic time in the cursor file, because it resets
+ // every time the machine reboots (on most platforms).
+ long nowMs = Time.now();
+ long minTimeoutMs = Long.MAX_VALUE;
+ for (int i = 0; i < numBlockIters; i++) {
+ int idx = (curIdx + i + 1) % numBlockIters;
+ BlockIterator iter = blockIters.get(idx);
+ if (!iter.atEnd()) {
+ LOG.info("Now scanning bpid {} on volume {}",
+ iter.getBlockPoolId(), volume.getBasePath());
+ curBlockIter = iter;
+ return 0L;
+ }
+ long iterStartMs = iter.getIterStartMs();
+ long waitMs = (iterStartMs + conf.scanPeriodMs) - nowMs;
+ if (waitMs <= 0) {
+ iter.rewind();
+ LOG.info("Now rescanning bpid {} on volume {}, after more than " +
+ "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(),
+ TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS));
+ curBlockIter = iter;
+ return 0L;
+ }
+ minTimeoutMs = Math.min(minTimeoutMs, waitMs);
+ }
+ LOG.info("{}: no suitable block pools found to scan. Waiting {} ms.",
+ this, minTimeoutMs);
+ return minTimeoutMs;
+ }
+
+ /**
+ * Scan a block.
+ *
+ * @param cblock The block to scan.
+ * @param bytesPerSec The bytes per second to scan at.
+ *
+ * @return The length of the block that was scanned, or
+ * -1 if the block could not be scanned.
+ */
+ private long scanBlock(ExtendedBlock cblock, long bytesPerSec) {
+ // 'cblock' has a valid blockId and block pool id, but we don't yet know the
+ // genstamp the block is supposed to have. Ask the FsDatasetImpl for this
+ // information.
+ ExtendedBlock block = null;
+ try {
+ Block b = volume.getDataset().getStoredBlock(
+ cblock.getBlockPoolId(), cblock.getBlockId());
+ if (b == null) {
+ LOG.info("FileNotFound while finding block {} on volume {}",
+ cblock, volume.getBasePath());
+ } else {
+ block = new ExtendedBlock(cblock.getBlockPoolId(), b);
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("FileNotFoundException while finding block {} on volume {}",
+ cblock, volume.getBasePath());
+ } catch (IOException e) {
+ LOG.warn("I/O error while finding block {} on volume {}",
+ cblock, volume.getBasePath());
+ }
+ if (block == null) {
+ return -1; // block not found.
+ }
+ BlockSender blockSender = null;
+ try {
+ blockSender = new BlockSender(block, 0, -1,
+ false, true, true, datanode, null,
+ CachingStrategy.newDropBehind());
+ throttler.setBandwidth(bytesPerSec);
+ long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
+ resultHandler.handle(block, null);
+ return bytesRead;
+ } catch (IOException e) {
+ resultHandler.handle(block, e);
+ } finally {
+ IOUtils.cleanup(null, blockSender);
+ }
+ return -1;
+ }
+
+ @VisibleForTesting
+ static boolean calculateShouldScan(long targetBytesPerSec,
+ long scannedBytesSum) {
+ long effectiveBytesPerSec =
+ scannedBytesSum / (SECONDS_PER_MINUTE * MINUTES_PER_HOUR);
+ boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec;
+ LOG.trace("calculateShouldScan: effectiveBytesPerSec = {}, and " +
+ "targetBytesPerSec = {}. shouldScan = {}",
+ effectiveBytesPerSec, targetBytesPerSec, shouldScan);
+ return shouldScan;
+ }
+
+ /**
+ * Run an iteration of the VolumeScanner loop.
+ *
+ * @return The number of milliseconds to delay before running the loop
+ * again, or 0 to re-run the loop immediately.
+ */
+ private long runLoop() {
+ long bytesScanned = -1;
+ boolean scanError = false;
+ ExtendedBlock block = null;
+ try {
+ long monotonicMs = Time.monotonicNow();
+ expireOldScannedBytesRecords(monotonicMs);
+
+ if (!calculateShouldScan(conf.targetBytesPerSec, scannedBytesSum)) {
+ // If neededBytesPerSec is too low, then wait few seconds for some old
+ // scannedBytes records to expire.
+ return 30000L;
+ }
+
+ // Find a usable block pool to scan.
+ if ((curBlockIter == null) || curBlockIter.atEnd()) {
+ long timeout = findNextUsableBlockIter();
+ if (timeout > 0) {
+ LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
+ "{} ms.", this, timeout);
+ synchronized (stats) {
+ stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+ }
+ return timeout;
+ }
+ synchronized (stats) {
+ stats.scansSinceRestart++;
+ stats.blocksScannedInCurrentPeriod = 0;
+ stats.nextBlockPoolScanStartMs = -1;
+ }
+ return 0L;
+ }
+
+ try {
+ block = curBlockIter.nextBlock();
+ } catch (IOException e) {
+ // There was an error listing the next block in the volume. This is a
+ // serious issue.
+ LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
+ // On the next loop iteration, curBlockIter#eof will be set to true, and
+ // we will pick a different block iterator.
+ return 0L;
+ }
+ if (block == null) {
+ // The BlockIterator is at EOF.
+ LOG.info("{}: finished scanning block pool {}",
+ this, curBlockIter.getBlockPoolId());
+ saveBlockIterator(curBlockIter);
+ return 0;
+ }
+ long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
+ if (saveDelta >= conf.cursorSaveMs) {
+ LOG.debug("{}: saving block iterator {} after {} ms.",
+ this, curBlockIter, saveDelta);
+ saveBlockIterator(curBlockIter);
+ }
+ bytesScanned = scanBlock(block, conf.targetBytesPerSec);
+ if (bytesScanned >= 0) {
+ scannedBytesSum += bytesScanned;
+ scannedBytes[(int)(curMinute % MINUTES_PER_HOUR)] += bytesScanned;
+ } else {
+ scanError = true;
+ }
+ return 0L;
+ } finally {
+ synchronized (stats) {
+ stats.bytesScannedInPastHour = scannedBytesSum;
+ if (bytesScanned >= 0) {
+ stats.blocksScannedInCurrentPeriod++;
+ stats.blocksScannedSinceRestart++;
+ }
+ if (scanError) {
+ stats.scanErrorsSinceRestart++;
+ }
+ if (block != null) {
+ stats.lastBlockScanned = block;
+ }
+ if (curBlockIter == null) {
+ stats.eof = true;
+ stats.blockPoolPeriodEndsMs = -1;
+ } else {
+ stats.eof = curBlockIter.atEnd();
+ stats.blockPoolPeriodEndsMs =
+ curBlockIter.getIterStartMs() + conf.scanPeriodMs;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.trace("{}: thread starting.", this);
+ resultHandler.setup(this);
+ try {
+ long timeout = 0;
+ while (true) {
+ // Take the lock to check if we should stop.
+ synchronized (this) {
+ if (stopping) {
+ break;
+ }
+ if (timeout > 0) {
+ wait(timeout);
+ if (stopping) {
+ break;
+ }
+ }
+ }
+ timeout = runLoop();
+ }
+ } catch (InterruptedException e) {
+ // We are exiting because of an InterruptedException,
+ // probably sent by VolumeScanner#shutdown.
+ LOG.trace("{} exiting because of InterruptedException.", this);
+ } catch (Throwable e) {
+ LOG.error("{} exiting because of exception ", this, e);
+ }
+ LOG.info("{} exiting.", this);
+ // Save the current position of all block iterators and close them.
+ for (BlockIterator iter : blockIters) {
+ saveBlockIterator(iter);
+ IOUtils.cleanup(null, iter);
+ }
+ } finally {
+ // When the VolumeScanner exits, release the reference we were holding
+ // on the volume. This will allow the volume to be removed later.
+ IOUtils.cleanup(null, ref);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "VolumeScanner(" + volume.getBasePath() +
+ ", " + volume.getStorageID() + ")";
+ }
+
+ /**
+ * Shut down this scanner.
+ */
+ public synchronized void shutdown() {
+ stopping = true;
+ notify();
+ this.interrupt();
+ }
+
+ /**
+ * Allow the scanner to scan the given block pool.
+ *
+ * @param bpid The block pool id.
+ */
+ public synchronized void enableBlockPoolId(String bpid) {
+ for (BlockIterator iter : blockIters) {
+ if (iter.getBlockPoolId().equals(bpid)) {
+ LOG.warn("{}: already enabled scanning on block pool {}", this, bpid);
+ return;
+ }
+ }
+ BlockIterator iter = null;
+ try {
+ // Load a block iterator for the next block pool on the volume.
+ iter = volume.loadBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+ LOG.trace("{}: loaded block iterator for {}.", this, bpid);
+ } catch (FileNotFoundException e) {
+ LOG.debug("{}: failed to load block iterator: " + e.getMessage(), this);
+ } catch (IOException e) {
+ LOG.warn("{}: failed to load block iterator.", this, e);
+ }
+ if (iter == null) {
+ iter = volume.newBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+ LOG.trace("{}: created new block iterator for {}.", this, bpid);
+ }
+ iter.setMaxStalenessMs(conf.maxStalenessMs);
+ blockIters.add(iter);
+ notify();
+ }
+
+ /**
+ * Disallow the scanner from scanning the given block pool.
+ *
+ * @param bpid The block pool id.
+ */
+ public synchronized void disableBlockPoolId(String bpid) {
+ Iterator<BlockIterator> i = blockIters.iterator();
+ while (i.hasNext()) {
+ BlockIterator iter = i.next();
+ if (iter.getBlockPoolId().equals(bpid)) {
+ LOG.trace("{}: disabling scanning on block pool {}", this, bpid);
+ i.remove();
+ IOUtils.cleanup(null, iter);
+ if (curBlockIter == iter) {
+ curBlockIter = null;
+ }
+ notify();
+ return;
+ }
+ }
+ LOG.warn("{}: can't remove block pool {}, because it was never " +
+ "added.", this, bpid);
+ }
+
+ @VisibleForTesting
+ Statistics getStatistics() {
+ synchronized (stats) {
+ return new Statistics(stats);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index cc7aec5..c554bc39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -90,24 +90,30 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
}
}
- /**
- * Create rolling logs.
- *
- * @param prefix the prefix of the log names.
- * @return rolling logs
- */
- public RollingLogs createRollingLogs(String bpid, String prefix
- ) throws IOException;
-
/** @return a list of volumes. */
public List<V> getVolumes();
- /** Add an array of StorageLocation to FsDataset. */
+ /**
+ * Add a new volume to the FsDataset.<p/>
+ *
+ * If the FSDataset supports block scanning, this function registers
+ * the new volume with the block scanner.
+ *
+ * @param location The storage location for the new volume.
+ * @param nsInfos Namespace information for the new volume.
+ */
public void addVolume(
final StorageLocation location,
final List<NamespaceInfo> nsInfos) throws IOException;
- /** Removes a collection of volumes from FsDataset. */
+ /**
+ * Removes a collection of volumes from FsDataset.
+ *
+ * If the FSDataset supports block scanning, this function removes
+ * the volumes from the block scanner.
+ *
+ * @param volumes The storage locations of the volumes to remove.
+ */
public void removeVolumes(Collection<StorageLocation> volumes);
/** @return a storage with the given storage ID */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index d9c37cb..9b28e67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
/**
* This is an interface for the underlying volume.
@@ -69,4 +71,112 @@ public interface FsVolumeSpi {
/** Returns true if the volume is NOT backed by persistent storage. */
public boolean isTransientStorage();
+
+ /**
+ * BlockIterator will return ExtendedBlock entries from a block pool in
+ * this volume. The entries will be returned in sorted order.<p/>
+ *
+ * BlockIterator objects themselves do not always have internal
+ * synchronization, so they can only safely be used by a single thread at a
+ * time.<p/>
+ *
+ * Closing the iterator does not save it. You must call save to save it.
+ */
+ public interface BlockIterator extends Closeable {
+ /**
+ * Get the next block.<p/>
+ *
+ * Note that this block may be removed in between the time we list it,
+ * and the time the caller tries to use it, or it may represent a stale
+ * entry. Callers should handle the case where the returned block no
+ * longer exists.
+ *
+ * @return The next block, or null if there are no
+ * more blocks. Null if there was an error
+ * determining the next block.
+ *
+ * @throws IOException If there was an error getting the next block in
+ * this volume. In this case, EOF will be set on
+ * the iterator.
+ */
+ public ExtendedBlock nextBlock() throws IOException;
+
+ /**
+ * Returns true if we got to the end of the block pool.
+ */
+ public boolean atEnd();
+
+ /**
+ * Repositions the iterator at the beginning of the block pool.
+ */
+ public void rewind();
+
+ /**
+ * Save this block iterator to the underlying volume.
+ * Any existing saved block iterator with this name will be overwritten.
+ * maxStalenessMs will not be saved.
+ *
+ * @throws IOException If there was an error when saving the block
+ * iterator.
+ */
+ public void save() throws IOException;
+
+ /**
+ * Set the maximum staleness of entries that we will return.<p/>
+ *
+ * A maximum staleness of 0 means we will never return stale entries; a
+ * larger value will allow us to reduce resource consumption in exchange
+ * for returning more potentially stale entries. Even with staleness set
+ * to 0, consumers of this API must handle race conditions where block
+ * disappear before they can be processed.
+ */
+ public void setMaxStalenessMs(long maxStalenessMs);
+
+ /**
+ * Get the wall-clock time, measured in milliseconds since the Epoch,
+ * when this iterator was created.
+ */
+ public long getIterStartMs();
+
+ /**
+ * Get the wall-clock time, measured in milliseconds since the Epoch,
+ * when this iterator was last saved. Returns iterStartMs if the
+ * iterator was never saved.
+ */
+ public long getLastSavedMs();
+
+ /**
+ * Get the id of the block pool which this iterator traverses.
+ */
+ public String getBlockPoolId();
+ }
+
+ /**
+ * Create a new block iterator. It will start at the beginning of the
+ * block set.
+ *
+ * @param bpid The block pool id to iterate over.
+ * @param name The name of the block iterator to create.
+ *
+ * @return The new block iterator.
+ */
+ public BlockIterator newBlockIterator(String bpid, String name);
+
+ /**
+ * Load a saved block iterator.
+ *
+ * @param bpid The block pool id to iterate over.
+ * @param name The name of the block iterator to load.
+ *
+ * @return The saved block iterator.
+ * @throws IOException If there was an IO error loading the saved
+ * block iterator.
+ */
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException;
+
+ /**
+ * Get the FSDatasetSpi which this volume is a part of.
+ */
+ public FsDatasetSpi getDataset();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
deleted file mode 100644
index 5d54770..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Rolling logs consist of a current log and a set of previous logs.
- *
- * The implementation should support a single appender and multiple readers.
- */
-public interface RollingLogs {
- /**
- * To iterate the lines of the logs.
- */
- public interface LineIterator extends Iterator<String>, Closeable {
- /** Is the iterator iterating the previous? */
- public boolean isPrevious();
-
- /**
- * Is the last read entry from previous? This should be called after
- * reading.
- */
- public boolean isLastReadFromPrevious();
- }
-
- /**
- * To append text to the logs.
- */
- public interface Appender extends Appendable, Closeable {
- }
-
- /**
- * Create an iterator to iterate the lines in the logs.
- *
- * @param skipPrevious Should it skip reading the previous log?
- * @return a new iterator.
- */
- public LineIterator iterator(boolean skipPrevious) throws IOException;
-
- /**
- * @return the only appender to append text to the logs.
- * The same object is returned if it is invoked multiple times.
- */
- public Appender appender();
-
- /**
- * Roll current to previous.
- *
- * @return true if the rolling succeeded.
- * When it returns false, it is not equivalent to an error.
- * It means that the rolling cannot be performed at the moment,
- * e.g. the logs are being read.
- */
- public boolean roll() throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index cefb206..082fb9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -70,7 +70,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -92,7 +92,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -294,7 +293,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
- volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+ volumes = new FsVolumeList(volsFailed, datanode.getBlockScanner(),
+ blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
@@ -326,6 +326,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
+ FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
@@ -336,7 +337,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
- volumes.addVolume(fsVolume);
+ volumes.addVolume(ref);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
@@ -375,6 +376,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw MultipleIOException.createIOException(exceptions);
}
+ final FsVolumeReference ref = fsVolume.obtainReference();
setupAsyncLazyPersistThread(fsVolume);
builder.build();
@@ -385,7 +387,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
- volumes.addVolume(fsVolume);
+ volumes.addVolume(ref);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
@@ -429,9 +431,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
it.remove();
}
}
- // Delete blocks from the block scanner in batch.
- datanode.getBlockScanner().deleteBlocks(bpid,
- blocks.toArray(new Block[blocks.size()]));
}
storageMap.remove(sd.getStorageUuid());
@@ -790,7 +789,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
// Replace the old block if any to reschedule the scanning.
- datanode.getBlockScanner().addBlock(block, false);
return replicaInfo;
}
@@ -2025,10 +2023,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Block is in memory and not on the disk
// Remove the block from volumeMap
volumeMap.remove(bpid, blockId);
- final DataBlockScanner blockScanner = datanode.getBlockScanner();
- if (blockScanner != null) {
- blockScanner.deleteBlock(bpid, new Block(blockId));
- }
if (vol.isTransientStorage()) {
ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
}
@@ -2051,12 +2045,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
- final DataBlockScanner blockScanner = datanode.getBlockScanner();
- if (!vol.isTransientStorage()) {
- if (blockScanner != null) {
- blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
- }
- } else {
+ if (vol.isTransientStorage()) {
ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
}
LOG.warn("Added missing block to memory " + diskBlockInfo);
@@ -2557,23 +2546,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
dataStorage.clearRollingUpgradeMarker(bpid);
}
- @Override
- public RollingLogs createRollingLogs(String bpid, String prefix
- ) throws IOException {
- String dir = null;
- final List<FsVolumeImpl> volumes = getVolumes();
- for (FsVolumeImpl vol : volumes) {
- String bpDir = vol.getPath(bpid);
- if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
- dir = bpDir;
- break;
- }
- }
- if (dir == null) {
- dir = volumes.get(0).getPath(bpid);
- }
- return new RollingLogsImpl(dir, prefix);
- }
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index a2d4f2e..e4877be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -17,9 +17,18 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
+import java.io.OutputStreamWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -41,15 +50,24 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.CloseableReferenceCount;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Time;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The underlying volume used to store replica.
@@ -59,6 +77,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.Private
@VisibleForTesting
public class FsVolumeImpl implements FsVolumeSpi {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FsVolumeImpl.class);
+
private final FsDatasetImpl dataset;
private final String storageID;
private final StorageType storageType;
@@ -395,6 +416,332 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
}
+ private enum SubdirFilter implements FilenameFilter {
+ INSTANCE;
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("subdir");
+ }
+ }
+
+ private enum BlockFileFilter implements FilenameFilter {
+ INSTANCE;
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return !name.endsWith(".meta") && name.startsWith("blk_");
+ }
+ }
+
+ @VisibleForTesting
+ public static String nextSorted(List<String> arr, String prev) {
+ int res = 0;
+ if (prev != null) {
+ res = Collections.binarySearch(arr, prev);
+ if (res < 0) {
+ res = -1 - res;
+ } else {
+ res++;
+ }
+ }
+ if (res >= arr.size()) {
+ return null;
+ }
+ return arr.get(res);
+ }
+
+ private static class BlockIteratorState {
+ BlockIteratorState() {
+ lastSavedMs = iterStartMs = Time.now();
+ curFinalizedDir = null;
+ curFinalizedSubDir = null;
+ curEntry = null;
+ atEnd = false;
+ }
+
+ // The wall-clock ms since the epoch at which this iterator was last saved.
+ @JsonProperty
+ private long lastSavedMs;
+
+ // The wall-clock ms since the epoch at which this iterator was created.
+ @JsonProperty
+ private long iterStartMs;
+
+ @JsonProperty
+ private String curFinalizedDir;
+
+ @JsonProperty
+ private String curFinalizedSubDir;
+
+ @JsonProperty
+ private String curEntry;
+
+ @JsonProperty
+ private boolean atEnd;
+ }
+
+ /**
+ * A BlockIterator implementation for FsVolumeImpl.
+ */
+ private class BlockIteratorImpl implements FsVolumeSpi.BlockIterator {
+ private final File bpidDir;
+ private final String name;
+ private final String bpid;
+ private long maxStalenessMs = 0;
+
+ private List<String> cache;
+ private long cacheMs;
+
+ private BlockIteratorState state;
+
+ BlockIteratorImpl(String bpid, String name) {
+ this.bpidDir = new File(currentDir, bpid);
+ this.name = name;
+ this.bpid = bpid;
+ rewind();
+ }
+
+ /**
+ * Get the next subdirectory within the block pool slice.
+ *
+ * @return The next subdirectory within the block pool slice, or
+ * null if there are no more.
+ */
+ private String getNextSubDir(String prev, File dir)
+ throws IOException {
+ List<String> children =
+ IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
+ cache = null;
+ cacheMs = 0;
+ if (children.size() == 0) {
+ LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
+ storageID, bpid, dir.getAbsolutePath());
+ return null;
+ }
+ Collections.sort(children);
+ String nextSubDir = nextSorted(children, prev);
+ if (nextSubDir == null) {
+ LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}",
+ storageID, bpid, dir.getAbsolutePath());
+ } else {
+ LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
+ "within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
+ }
+ return nextSubDir;
+ }
+
+ private String getNextFinalizedDir() throws IOException {
+ File dir = Paths.get(
+ bpidDir.getAbsolutePath(), "current", "finalized").toFile();
+ return getNextSubDir(state.curFinalizedDir, dir);
+ }
+
+ private String getNextFinalizedSubDir() throws IOException {
+ if (state.curFinalizedDir == null) {
+ return null;
+ }
+ File dir = Paths.get(
+ bpidDir.getAbsolutePath(), "current", "finalized",
+ state.curFinalizedDir).toFile();
+ return getNextSubDir(state.curFinalizedSubDir, dir);
+ }
+
+ private List<String> getSubdirEntries() throws IOException {
+ if (state.curFinalizedSubDir == null) {
+ return null; // There are no entries in the null subdir.
+ }
+ long now = Time.monotonicNow();
+ if (cache != null) {
+ long delta = now - cacheMs;
+ if (delta < maxStalenessMs) {
+ return cache;
+ } else {
+ LOG.trace("getSubdirEntries({}, {}): purging entries cache for {} " +
+ "after {} ms.", storageID, bpid, state.curFinalizedSubDir, delta);
+ cache = null;
+ }
+ }
+ File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
+ state.curFinalizedDir, state.curFinalizedSubDir).toFile();
+ List<String> entries =
+ IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
+ if (entries.size() == 0) {
+ entries = null;
+ } else {
+ Collections.sort(entries);
+ }
+ if (entries == null) {
+ LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
+ storageID, bpid, dir.getAbsolutePath());
+ } else {
+ LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
+ storageID, bpid, entries.size(), dir.getAbsolutePath());
+ }
+ cache = entries;
+ cacheMs = now;
+ return cache;
+ }
+
+ /**
+ * Get the next block.<p/>
+ *
+ * Each volume has a hierarchical structure.<p/>
+ *
+ * <code>
+ * BPID B0
+ * finalized/
+ * subdir0
+ * subdir0
+ * blk_000
+ * blk_001
+ * ...
+ * subdir1
+ * subdir0
+ * ...
+ * rbw/
+ * </code>
+ *
+ * When we run out of entries at one level of the structure, we search
+ * progressively higher levels. For example, when we run out of blk_
+ * entries in a subdirectory, we search for the next subdirectory.
+ * And so on.
+ */
+ @Override
+ public ExtendedBlock nextBlock() throws IOException {
+ if (state.atEnd) {
+ return null;
+ }
+ try {
+ while (true) {
+ List<String> entries = getSubdirEntries();
+ if (entries != null) {
+ state.curEntry = nextSorted(entries, state.curEntry);
+ if (state.curEntry == null) {
+ LOG.trace("nextBlock({}, {}): advancing from {} to next " +
+ "subdirectory.", storageID, bpid, state.curFinalizedSubDir);
+ } else {
+ ExtendedBlock block =
+ new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
+ LOG.trace("nextBlock({}, {}): advancing to {}",
+ storageID, bpid, block);
+ return block;
+ }
+ }
+ state.curFinalizedSubDir = getNextFinalizedSubDir();
+ if (state.curFinalizedSubDir == null) {
+ state.curFinalizedDir = getNextFinalizedDir();
+ if (state.curFinalizedDir == null) {
+ state.atEnd = true;
+ return null;
+ }
+ }
+ }
+ } catch (IOException e) {
+ state.atEnd = true;
+ LOG.error("nextBlock({}, {}): I/O error", storageID, bpid, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean atEnd() {
+ return state.atEnd;
+ }
+
+ @Override
+ public void rewind() {
+ cache = null;
+ cacheMs = 0;
+ state = new BlockIteratorState();
+ }
+
+ @Override
+ public void save() throws IOException {
+ state.lastSavedMs = Time.now();
+ boolean success = false;
+ ObjectMapper mapper = new ObjectMapper();
+ try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
+ mapper.writerWithDefaultPrettyPrinter().writeValue(writer, state);
+ success = true;
+ } finally {
+ if (!success) {
+ if (getTempSaveFile().delete()) {
+ LOG.debug("save({}, {}): error deleting temporary file.",
+ storageID, bpid);
+ }
+ }
+ }
+ Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
+ StandardCopyOption.ATOMIC_MOVE);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("save({}, {}): saved {}", storageID, bpid,
+ mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+ }
+ }
+
+ public void load() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ File file = getSaveFile();
+ this.state = mapper.reader(BlockIteratorState.class).readValue(file);
+ LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID,
+ bpid, name, file.getAbsoluteFile(),
+ mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+ }
+
+ File getSaveFile() {
+ return new File(bpidDir, name + ".cursor");
+ }
+
+ File getTempSaveFile() {
+ return new File(bpidDir, name + ".cursor.tmp");
+ }
+
+ @Override
+ public void setMaxStalenessMs(long maxStalenessMs) {
+ this.maxStalenessMs = maxStalenessMs;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No action needed for this volume implementation.
+ }
+
+ @Override
+ public long getIterStartMs() {
+ return state.iterStartMs;
+ }
+
+ @Override
+ public long getLastSavedMs() {
+ return state.lastSavedMs;
+ }
+
+ @Override
+ public String getBlockPoolId() {
+ return bpid;
+ }
+ }
+
+ @Override
+ public BlockIterator newBlockIterator(String bpid, String name) {
+ return new BlockIteratorImpl(bpid, name);
+ }
+
+ @Override
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException {
+ BlockIteratorImpl iter = new BlockIteratorImpl(bpid, name);
+ iter.load();
+ return iter;
+ }
+
+ @Override
+ public FsDatasetSpi getDataset() {
+ return dataset;
+ }
+
/**
* RBW files. They get moved to the finalized block directory when
* the block is finalized.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index c837593..ae2f5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
@@ -42,11 +43,13 @@ class FsVolumeList {
private Object checkDirsMutex = new Object();
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
+ private final BlockScanner blockScanner;
private volatile int numFailedVolumes;
- FsVolumeList(int failedVols,
+ FsVolumeList(int failedVols, BlockScanner blockScanner,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
this.blockChooser = blockChooser;
+ this.blockScanner = blockScanner;
this.numFailedVolumes = failedVols;
}
@@ -260,13 +263,14 @@ class FsVolumeList {
/**
* Dynamically add new volumes to the existing volumes that this DN manages.
- * @param newVolume the instance of new FsVolumeImpl.
+ *
+ * @param ref a reference to the new FsVolumeImpl instance.
*/
- void addVolume(FsVolumeImpl newVolume) {
+ void addVolume(FsVolumeReference ref) {
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
- volumeList.add(newVolume);
+ volumeList.add((FsVolumeImpl)ref.getVolume());
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
break;
@@ -274,12 +278,15 @@ class FsVolumeList {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
- "retry to remove volume: " + newVolume);
+ "retry to remove volume: " + ref.getVolume().getStorageID());
}
}
}
-
- FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+ if (blockScanner != null) {
+ blockScanner.addVolumeScanner(ref);
+ }
+ FsDatasetImpl.LOG.info("Added new volume: " +
+ ref.getVolume().getStorageID());
}
/**
@@ -293,6 +300,9 @@ class FsVolumeList {
if (volumeList.remove(target)) {
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+ if (blockScanner != null) {
+ blockScanner.removeVolumeScanner(target);
+ }
try {
target.closeAndWait();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
deleted file mode 100644
index 121127d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-
-import com.google.common.base.Charsets;
-
-class RollingLogsImpl implements RollingLogs {
- private static final String CURR_SUFFIX = ".curr";
- private static final String PREV_SUFFIX = ".prev";
-
- static boolean isFilePresent(String dir, String filePrefix) {
- return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
- new File(dir, filePrefix + PREV_SUFFIX).exists();
- }
-
- private final File curr;
- private final File prev;
- private PrintWriter out; //require synchronized access
-
- private final Appender appender = new Appender() {
- @Override
- public Appendable append(CharSequence csq) {
- synchronized(RollingLogsImpl.this) {
- if (out == null) {
- throw new IllegalStateException(RollingLogsImpl.this
- + " is not yet opened.");
- }
- out.print(csq);
- out.flush();
- }
- return this;
- }
-
- @Override
- public Appendable append(char c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Appendable append(CharSequence csq, int start, int end) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- synchronized(RollingLogsImpl.this) {
- if (out != null) {
- out.close();
- out = null;
- }
- }
- }
- };
-
-
- private final AtomicInteger numReaders = new AtomicInteger();
-
- RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
- curr = new File(dir, filePrefix + CURR_SUFFIX);
- prev = new File(dir, filePrefix + PREV_SUFFIX);
- out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
- curr, true), Charsets.UTF_8));
- }
-
- @Override
- public Reader iterator(boolean skipPrevFile) throws IOException {
- numReaders.incrementAndGet();
- return new Reader(skipPrevFile);
- }
-
- @Override
- public Appender appender() {
- return appender;
- }
-
- @Override
- public boolean roll() throws IOException {
- if (numReaders.get() > 0) {
- return false;
- }
- if (!prev.delete() && prev.exists()) {
- throw new IOException("Failed to delete " + prev);
- }
-
- synchronized(this) {
- appender.close();
- final boolean renamed = curr.renameTo(prev);
- out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
- curr, true), Charsets.UTF_8));
- if (!renamed) {
- throw new IOException("Failed to rename " + curr + " to " + prev);
- }
- }
- return true;
- }
-
- @Override
- public String toString() {
- return curr.toString();
- }
-
- /**
- * This is used to read the lines in order.
- * If the data is not read completely (i.e, untill hasNext() returns
- * false), it needs to be explicitly
- */
- private class Reader implements RollingLogs.LineIterator {
- private File file;
- private File lastReadFile;
- private BufferedReader reader;
- private String line;
- private boolean closed = false;
-
- private Reader(boolean skipPrevFile) throws IOException {
- reader = null;
- file = skipPrevFile? curr : prev;
- readNext();
- }
-
- @Override
- public boolean isPrevious() {
- return file == prev;
- }
-
- @Override
- public boolean isLastReadFromPrevious() {
- return lastReadFile == prev;
- }
-
- private boolean openFile() throws IOException {
-
- for(int i=0; i<2; i++) {
- if (reader != null || i > 0) {
- // move to next file
- file = isPrevious()? curr : null;
- }
- if (file == null) {
- return false;
- }
- if (file.exists()) {
- break;
- }
- }
-
- if (reader != null ) {
- reader.close();
- reader = null;
- }
-
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(
- file), Charsets.UTF_8));
- return true;
- }
-
- // read next line if possible.
- private void readNext() throws IOException {
- line = null;
- try {
- if (reader != null && (line = reader.readLine()) != null) {
- return;
- }
- // move to the next file.
- if (openFile()) {
- readNext();
- }
- } finally {
- if (!hasNext()) {
- close();
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return line != null;
- }
-
- @Override
- public String next() {
- String curLine = line;
- try {
- lastReadFile = file;
- readNext();
- } catch (IOException e) {
- DataBlockScanner.LOG.warn("Failed to read next line.", e);
- }
- return curLine;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws IOException {
- if (!closed) {
- try {
- if (reader != null) {
- reader.close();
- }
- } finally {
- file = null;
- reader = null;
- closed = true;
- final int n = numReaders.decrementAndGet();
- assert(n >= 0);
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index bfaa33b..bb28f01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -987,6 +987,26 @@
</property>
<property>
+ <name>dfs.datanode.scan.period.hours</name>
+ <value>0</value>
+ <description>
+ If this is 0 or negative, the DataNode's block scanner will be
+ disabled. If this is positive, the DataNode will not scan any
+ individual block more than once in the specified scan period.
+ </description>
+</property>
+
+<property>
+ <name>dfs.block.scanner.volume.bytes.per.second</name>
+ <value>1048576</value>
+ <description>
+ If this is 0, the DataNode's block scanner will be disabled. If this
+ is positive, this is the number of bytes per second that the DataNode's
+ block scanner will try to scan from each volume.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.readahead.bytes</name>
<value>4193404</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index a14c84c..086f9ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1697,4 +1697,20 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
}
+
+ /**
+ * Change the length of a block at datanode dnIndex
+ */
+ public static boolean changeReplicaLength(MiniDFSCluster cluster,
+ ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
+ File blockFile = cluster.getBlockFile(dnIndex, blk);
+ if (blockFile != null && blockFile.exists()) {
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+ raFile.setLength(raFile.length()+lenDelta);
+ raFile.close();
+ return true;
+ }
+ LOG.info("failed to change length of block " + blk);
+ return false;
+ }
}