You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zh...@apache.org on 2015/01/26 18:44:37 UTC
[22/50] [abbrv] hadoop git commit: HDFS-7430. Refactor the
BlockScanner to use O(1) memory and use multiple threads (cmccabe)
HDFS-7430. Refactor the BlockScanner to use O(1) memory and use multiple threads (cmccabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/df4edd9a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/df4edd9a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/df4edd9a
Branch: refs/heads/HDFS-EC
Commit: df4edd9aea0dc9b4dff82347b2776f7069018243
Parents: a691658
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Wed Dec 17 11:27:48 2014 -0800
Committer: Zhe Zhang <zh...@apache.org>
Committed: Mon Jan 26 09:43:27 2015 -0800
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 +
.../hdfs/server/datanode/BPOfferService.java | 3 -
.../hdfs/server/datanode/BPServiceActor.java | 6 -
.../server/datanode/BlockPoolSliceScanner.java | 872 -------------------
.../hdfs/server/datanode/BlockReceiver.java | 8 -
.../hdfs/server/datanode/BlockScanner.java | 308 +++++++
.../hdfs/server/datanode/BlockSender.java | 3 -
.../hdfs/server/datanode/DataBlockScanner.java | 339 -------
.../hadoop/hdfs/server/datanode/DataNode.java | 73 +-
.../hdfs/server/datanode/VolumeScanner.java | 652 ++++++++++++++
.../server/datanode/fsdataset/FsDatasetSpi.java | 32 +-
.../server/datanode/fsdataset/FsVolumeSpi.java | 110 +++
.../server/datanode/fsdataset/RollingLogs.java | 73 --
.../datanode/fsdataset/impl/FsDatasetImpl.java | 44 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 347 ++++++++
.../datanode/fsdataset/impl/FsVolumeList.java | 24 +-
.../fsdataset/impl/RollingLogsImpl.java | 241 -----
.../src/main/resources/hdfs-default.xml | 20 +
.../org/apache/hadoop/hdfs/DFSTestUtil.java | 16 +
.../hadoop/hdfs/TestDatanodeBlockScanner.java | 551 ------------
.../org/apache/hadoop/hdfs/TestReplication.java | 3 +-
.../TestOverReplicatedBlocks.java | 13 +-
.../server/datanode/BlockReportTestBase.java | 7 +-
.../hdfs/server/datanode/DataNodeTestUtils.java | 24 -
.../server/datanode/SimulatedFSDataset.java | 22 +-
.../hdfs/server/datanode/TestBlockScanner.java | 680 +++++++++++++++
.../server/datanode/TestDirectoryScanner.java | 16 +
.../TestMultipleNNDataBlockScanner.java | 245 ------
.../extdataset/ExternalDatasetImpl.java | 7 -
.../extdataset/ExternalRollingLogs.java | 92 --
.../datanode/extdataset/ExternalVolumeImpl.java | 17 +
.../extdataset/TestExternalDataset.java | 9 -
.../fsdataset/impl/FsVolumeListTest.java | 17 +-
.../fsdataset/impl/TestFsDatasetImpl.java | 30 +-
.../impl/TestInterDatanodeProtocol.java | 4 +-
.../namenode/snapshot/SnapshotTestHelper.java | 4 +-
37 files changed, 2288 insertions(+), 2629 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 25ad33b..866b765 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -140,6 +140,9 @@ Trunk (Unreleased)
class and constructor to public; and fix FsDatasetSpi to use generic type
instead of FsVolumeImpl. (David Powell and Joe Pallas via szetszwo)
+ HDFS-7430. Rewrite the BlockScanner to use O(1) memory and use multiple
+ threads (cmccabe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index fb958f1..60581b8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -441,6 +441,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT = 4096;
public static final String DFS_DATANODE_SCAN_PERIOD_HOURS_KEY = "dfs.datanode.scan.period.hours";
public static final int DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT = 0;
+ public static final String DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND = "dfs.block.scanner.volume.bytes.per.second";
+ public static final long DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT = 1048576L;
public static final String DFS_DATANODE_TRANSFERTO_ALLOWED_KEY = "dfs.datanode.transferTo.allowed";
public static final boolean DFS_DATANODE_TRANSFERTO_ALLOWED_DEFAULT = true;
public static final String DFS_HEARTBEAT_INTERVAL_KEY = "dfs.heartbeat.interval";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
index 4a54bed..dfeacde 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
@@ -656,9 +656,6 @@ class BPOfferService {
//
Block toDelete[] = bcmd.getBlocks();
try {
- if (dn.blockScanner != null) {
- dn.blockScanner.deleteBlocks(bcmd.getBlockPoolId(), toDelete);
- }
// using global fsdataset
dn.getFSDataset().invalidate(bcmd.getBlockPoolId(), toDelete);
} catch(IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index e6409ab..e396727 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -736,12 +736,6 @@ class BPServiceActor implements Runnable {
DatanodeCommand cmd = cacheReport();
processCommand(new DatanodeCommand[]{ cmd });
- // Now safe to start scanning the block pool.
- // If it has already been started, this is a no-op.
- if (dn.blockScanner != null) {
- dn.blockScanner.addBlockPool(bpos.getBlockPoolId());
- }
-
//
// There is no work to do; sleep until hearbeat timer elapses,
// or work arrives, and then iterate again.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
deleted file mode 100644
index f36fea1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java
+++ /dev/null
@@ -1,872 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.DataOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.text.DateFormat;
-import java.text.SimpleDateFormat;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.common.GenerationStamp;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-import org.apache.hadoop.hdfs.util.DataTransferThrottler;
-import org.apache.hadoop.util.GSet;
-import org.apache.hadoop.util.LightWeightGSet;
-import org.apache.hadoop.util.LightWeightGSet.LinkedElement;
-import org.apache.hadoop.io.IOUtils;
-import org.apache.hadoop.util.Time;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Scans the block files under a block pool and verifies that the
- * files are not corrupt.
- * This keeps track of blocks and their last verification times.
- * Currently it does not modify the metadata for block.
- */
-
-class BlockPoolSliceScanner {
-
- public static final Log LOG = LogFactory.getLog(BlockPoolSliceScanner.class);
-
- private static final String DATA_FORMAT = "yyyy-MM-dd HH:mm:ss,SSS";
-
- private static final int MAX_SCAN_RATE = 8 * 1024 * 1024; // 8MB per sec
- private static final int MIN_SCAN_RATE = 1 * 1024 * 1024; // 1MB per sec
- private static final long DEFAULT_SCAN_PERIOD_HOURS = 21*24L; // three weeks
-
- private static final String VERIFICATION_PREFIX = "dncp_block_verification.log";
-
- private final String blockPoolId;
- private final long scanPeriod;
- private final AtomicLong lastScanTime = new AtomicLong();
-
- private final DataNode datanode;
- private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
-
- private final SortedSet<BlockScanInfo> blockInfoSet
- = new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
- private final SortedSet<BlockScanInfo> newBlockInfoSet =
- new TreeSet<BlockScanInfo>(BlockScanInfo.LAST_SCAN_TIME_COMPARATOR);
-
- private final GSet<Block, BlockScanInfo> blockMap
- = new LightWeightGSet<Block, BlockScanInfo>(
- LightWeightGSet.computeCapacity(0.5, "BlockMap"));
-
- // processedBlocks keeps track of which blocks are scanned
- // since the last run.
- private volatile HashMap<Long, Integer> processedBlocks;
-
- private long totalScans = 0;
- private long totalScanErrors = 0;
- private long totalTransientErrors = 0;
- private final AtomicInteger totalBlocksScannedInLastRun = new AtomicInteger(); // Used for test only
-
- private long currentPeriodStart = Time.monotonicNow();
- private long bytesLeft = 0; // Bytes to scan in this period
- private long totalBytesToScan = 0;
- private boolean isNewPeriod = true;
- private int lastScanTimeDifference = 5*60*1000;
-
- private final LogFileHandler verificationLog;
-
- private final DataTransferThrottler throttler = new DataTransferThrottler(
- 200, MAX_SCAN_RATE);
-
- private static enum ScanType {
- IMMEDIATE_SCAN,
- VERIFICATION_SCAN, // scanned as part of periodic verfication
- NONE,
- }
-
- // Extend Block because in the DN process there's a 1-to-1 correspondence of
- // BlockScanInfo to Block instances, so by extending rather than containing
- // Block, we can save a bit of Object overhead (about 24 bytes per block
- // replica.)
- static class BlockScanInfo extends Block
- implements LightWeightGSet.LinkedElement {
-
- /** Compare the info by the last scan time. */
- static final Comparator<BlockScanInfo> LAST_SCAN_TIME_COMPARATOR
- = new Comparator<BlockPoolSliceScanner.BlockScanInfo>() {
-
- @Override
- public int compare(BlockScanInfo left, BlockScanInfo right) {
- final ScanType leftNextScanType = left.nextScanType;
- final ScanType rightNextScanType = right.nextScanType;
- final long l = left.lastScanTime;
- final long r = right.lastScanTime;
- // Compare by nextScanType if they are same then compare by
- // lastScanTimes
- // compare blocks itself if scantimes are same to avoid.
- // because TreeMap uses comparator if available to check existence of
- // the object.
- int compareByNextScanType = leftNextScanType.compareTo(rightNextScanType);
- return compareByNextScanType < 0? -1: compareByNextScanType > 0? 1: l < r? -1: l > r? 1: left.compareTo(right);
- }
- };
-
- long lastScanTime = 0;
- ScanType lastScanType = ScanType.NONE;
- boolean lastScanOk = true;
- private LinkedElement next;
- ScanType nextScanType = ScanType.VERIFICATION_SCAN;
-
- BlockScanInfo(Block block) {
- super(block);
- }
-
- @Override
- public int hashCode() {
- return super.hashCode();
- }
-
- @Override
- public boolean equals(Object that) {
- if (this == that) {
- return true;
- }
- return super.equals(that);
- }
-
- long getLastScanTime() {
- return (lastScanType == ScanType.NONE) ? 0 : lastScanTime;
- }
-
- @Override
- public void setNext(LinkedElement next) {
- this.next = next;
- }
-
- @Override
- public LinkedElement getNext() {
- return next;
- }
- }
-
- BlockPoolSliceScanner(String bpid, DataNode datanode,
- FsDatasetSpi<? extends FsVolumeSpi> dataset, Configuration conf) {
- this.datanode = datanode;
- this.dataset = dataset;
- this.blockPoolId = bpid;
-
- long hours = conf.getInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
- DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT);
- if (hours <= 0) {
- hours = DEFAULT_SCAN_PERIOD_HOURS;
- }
- this.scanPeriod = hours * 3600 * 1000;
- LOG.info("Periodic Block Verification Scanner initialized with interval "
- + hours + " hours for block pool " + bpid);
-
- // get the list of blocks and arrange them in random order
- List<FinalizedReplica> arr = dataset.getFinalizedBlocksOnPersistentStorage(blockPoolId);
- Collections.shuffle(arr);
-
- long scanTime = -1;
- for (Block block : arr) {
- BlockScanInfo info = new BlockScanInfo( block );
- info.lastScanTime = scanTime--;
- //still keep 'info.lastScanType' to NONE.
- addBlockInfo(info, false);
- }
-
- RollingLogs rollingLogs = null;
- try {
- rollingLogs = dataset.createRollingLogs(blockPoolId, VERIFICATION_PREFIX);
- } catch (IOException e) {
- LOG.warn("Could not open verfication log. " +
- "Verification times are not stored.");
- }
- verificationLog = rollingLogs == null? null: new LogFileHandler(rollingLogs);
- }
-
- String getBlockPoolId() {
- return blockPoolId;
- }
-
- private void updateBytesToScan(long len, long lastScanTime) {
- // len could be negative when a block is deleted.
- totalBytesToScan += len;
- if ( lastScanTime < currentPeriodStart ) {
- bytesLeft += len;
- }
- // Should we change throttler bandwidth every time bytesLeft changes?
- // not really required.
- }
-
- /**
- * Add the BlockScanInfo to sorted set of blockScanInfo
- * @param info BlockScanInfo to be added
- * @param isNewBlock true if the block is the new Block, false if
- * BlockScanInfo is being updated with new scanTime
- */
- private synchronized void addBlockInfo(BlockScanInfo info,
- boolean isNewBlock) {
- boolean added = false;
- if (isNewBlock) {
- // check whether the block already present
- boolean exists = blockInfoSet.contains(info);
- added = !exists && newBlockInfoSet.add(info);
- } else {
- added = blockInfoSet.add(info);
- }
- blockMap.put(info);
-
- if (added) {
- updateBytesToScan(info.getNumBytes(), info.lastScanTime);
- }
- }
-
- private synchronized void delBlockInfo(BlockScanInfo info) {
- boolean exists = blockInfoSet.remove(info);
- if (!exists){
- exists = newBlockInfoSet.remove(info);
- }
- blockMap.remove(info);
-
- if (exists) {
- updateBytesToScan(-info.getNumBytes(), info.lastScanTime);
- }
- }
-
- /** Update blockMap by the given LogEntry */
- private synchronized void updateBlockInfo(LogEntry e) {
- BlockScanInfo info = blockMap.get(new Block(e.blockId, 0, e.genStamp));
-
- if (info != null && e.verificationTime > 0 &&
- info.lastScanTime < e.verificationTime) {
- delBlockInfo(info);
- if (info.nextScanType != ScanType.IMMEDIATE_SCAN) {
- info.lastScanTime = e.verificationTime;
- }
- info.lastScanType = ScanType.VERIFICATION_SCAN;
- addBlockInfo(info, false);
- }
- }
-
- private synchronized long getNewBlockScanTime() {
- /* If there are a lot of blocks, this returns a random time with in
- * the scan period. Otherwise something sooner.
- */
- long period = Math.min(scanPeriod,
- Math.max(blockMap.size(),1) * 600 * 1000L);
- int periodInt = Math.abs((int)period);
- return Time.monotonicNow() - scanPeriod +
- DFSUtil.getRandom().nextInt(periodInt);
- }
-
- /** Adds block to list of blocks
- * @param scanNow - true if we want to make that particular block a high
- * priority one to scan immediately
- **/
- synchronized void addBlock(ExtendedBlock block, boolean scanNow) {
- BlockScanInfo info = blockMap.get(block.getLocalBlock());
- long lastScanTime = 0;
- if (info != null) {
- lastScanTime = info.lastScanTime;
- }
- // If the particular block is scanned in last 5 minutes, the no need to
- // verify that block again
- if (scanNow && Time.monotonicNow() - lastScanTime <
- lastScanTimeDifference) {
- return;
- }
-
- if ( info != null ) {
- LOG.warn("Adding an already existing block " + block);
- delBlockInfo(info);
- }
-
- info = new BlockScanInfo(block.getLocalBlock());
- info.lastScanTime = getNewBlockScanTime();
- if (scanNow) {
- // Create a new BlockScanInfo object and set the lastScanTime to 0
- // which will make it the high priority block
- LOG.info("Adding block for immediate verification " + block);
- info.nextScanType = ScanType.IMMEDIATE_SCAN;
- }
-
- addBlockInfo(info, true);
- adjustThrottler();
- }
-
- /** Deletes the block from internal structures */
- synchronized void deleteBlock(Block block) {
- BlockScanInfo info = blockMap.get(block);
- if (info != null) {
- delBlockInfo(info);
- }
- }
-
- @VisibleForTesting
- long getTotalScans() {
- return totalScans;
- }
-
- /** @return the last scan time for the block pool. */
- long getLastScanTime() {
- return lastScanTime.get();
- }
-
- /** @return the last scan time the given block. */
- synchronized long getLastScanTime(Block block) {
- BlockScanInfo info = blockMap.get(block);
- return info == null? 0: info.lastScanTime;
- }
-
- /** Deletes blocks from internal structures */
- void deleteBlocks(Block[] blocks) {
- for ( Block b : blocks ) {
- deleteBlock(b);
- }
- }
-
- private synchronized void updateScanStatus(BlockScanInfo info,
- ScanType type,
- boolean scanOk) {
- delBlockInfo(info);
-
- long now = Time.monotonicNow();
- info.lastScanType = type;
- info.lastScanTime = now;
- info.lastScanOk = scanOk;
- info.nextScanType = ScanType.VERIFICATION_SCAN;
- addBlockInfo(info, false);
-
- // Don't update meta data if the verification failed.
- if (!scanOk) {
- return;
- }
-
- if (verificationLog != null) {
- verificationLog.append(now, info.getGenerationStamp(),
- info.getBlockId());
- }
- }
-
- private void handleScanFailure(ExtendedBlock block) {
- LOG.info("Reporting bad " + block);
- try {
- datanode.reportBadBlocks(block);
- } catch (IOException ie) {
- // it is bad, but not bad enough to shutdown the scanner
- LOG.warn("Cannot report bad " + block.getBlockId());
- }
- }
-
- @VisibleForTesting
- synchronized void setLastScanTimeDifference(int lastScanTimeDifference) {
- this.lastScanTimeDifference = lastScanTimeDifference;
- }
-
- static private class LogEntry {
-
- long blockId = -1;
- long verificationTime = -1;
- long genStamp = GenerationStamp.GRANDFATHER_GENERATION_STAMP;
-
- /**
- * The format consists of single line with multiple entries. each
- * entry is in the form : name="value".
- * This simple text and easily extendable and easily parseable with a
- * regex.
- */
- private static final Pattern entryPattern =
- Pattern.compile("\\G\\s*([^=\\p{Space}]+)=\"(.*?)\"\\s*");
-
- static String toString(long verificationTime, long genStamp, long blockId,
- DateFormat dateFormat) {
- return "\ndate=\"" + dateFormat.format(new Date(verificationTime))
- + "\"\t time=\"" + verificationTime
- + "\"\t genstamp=\"" + genStamp
- + "\"\t id=\"" + blockId + "\"";
- }
-
- static LogEntry parseEntry(String line) {
- LogEntry entry = new LogEntry();
-
- Matcher matcher = entryPattern.matcher(line);
- while (matcher.find()) {
- String name = matcher.group(1);
- String value = matcher.group(2);
-
- try {
- if (name.equals("id")) {
- entry.blockId = Long.parseLong(value);
- } else if (name.equals("time")) {
- entry.verificationTime = Long.parseLong(value);
- } else if (name.equals("genstamp")) {
- entry.genStamp = Long.parseLong(value);
- }
- } catch(NumberFormatException nfe) {
- LOG.warn("Cannot parse line: " + line, nfe);
- return null;
- }
- }
-
- return entry;
- }
- }
-
- private synchronized void adjustThrottler() {
- long timeLeft = Math.max(1L,
- currentPeriodStart + scanPeriod - Time.monotonicNow());
- long bw = Math.max((bytesLeft * 1000) / timeLeft, MIN_SCAN_RATE);
- throttler.setBandwidth(Math.min(bw, MAX_SCAN_RATE));
- }
-
- @VisibleForTesting
- void verifyBlock(ExtendedBlock block) {
- BlockSender blockSender = null;
-
- /* In case of failure, attempt to read second time to reduce
- * transient errors. How do we flush block data from kernel
- * buffers before the second read?
- */
- for (int i=0; i<2; i++) {
- boolean second = (i > 0);
-
- try {
- adjustThrottler();
-
- blockSender = new BlockSender(block, 0, -1, false, true, true,
- datanode, null, CachingStrategy.newDropBehind());
-
- DataOutputStream out =
- new DataOutputStream(new IOUtils.NullOutputStream());
-
- blockSender.sendBlock(out, null, throttler);
-
- LOG.info((second ? "Second " : "") +
- "Verification succeeded for " + block);
-
- if ( second ) {
- totalTransientErrors++;
- }
-
- updateScanStatus((BlockScanInfo)block.getLocalBlock(),
- ScanType.VERIFICATION_SCAN, true);
-
- return;
- } catch (IOException e) {
- updateScanStatus((BlockScanInfo)block.getLocalBlock(),
- ScanType.VERIFICATION_SCAN, false);
-
- // If the block does not exists anymore, then its not an error
- if (!dataset.contains(block)) {
- LOG.info(block + " is no longer in the dataset");
- deleteBlock(block.getLocalBlock());
- return;
- }
-
- // If the block exists, the exception may due to a race with write:
- // The BlockSender got an old block path in rbw. BlockReceiver removed
- // the rbw block from rbw to finalized but BlockSender tried to open the
- // file before BlockReceiver updated the VolumeMap. The state of the
- // block can be changed again now, so ignore this error here. If there
- // is a block really deleted by mistake, DirectoryScan should catch it.
- if (e instanceof FileNotFoundException ) {
- LOG.info("Verification failed for " + block +
- " - may be due to race with write");
- deleteBlock(block.getLocalBlock());
- return;
- }
-
- LOG.warn((second ? "Second " : "First ") + "Verification failed for "
- + block, e);
-
- if (second) {
- totalScanErrors++;
- datanode.getMetrics().incrBlockVerificationFailures();
- handleScanFailure(block);
- return;
- }
- } finally {
- IOUtils.closeStream(blockSender);
- datanode.getMetrics().incrBlocksVerified();
- totalScans++;
- }
- }
- }
-
- private synchronized long getEarliestScanTime() {
- if (!blockInfoSet.isEmpty()) {
- return blockInfoSet.first().lastScanTime;
- }
- return Long.MAX_VALUE;
- }
-
- private synchronized boolean isFirstBlockProcessed() {
- if (!blockInfoSet.isEmpty()) {
- if (blockInfoSet.first().nextScanType == ScanType.IMMEDIATE_SCAN) {
- return false;
- }
- long blockId = blockInfoSet.first().getBlockId();
- if ((processedBlocks.get(blockId) != null)
- && (processedBlocks.get(blockId) == 1)) {
- return true;
- }
- }
- return false;
- }
-
- // Picks one block and verifies it
- private void verifyFirstBlock() {
- BlockScanInfo block = null;
- synchronized (this) {
- if (!blockInfoSet.isEmpty()) {
- block = blockInfoSet.first();
- }
- }
- if ( block != null ) {
- verifyBlock(new ExtendedBlock(blockPoolId, block));
- processedBlocks.put(block.getBlockId(), 1);
- }
- }
-
- // Used for tests only
- int getBlocksScannedInLastRun() {
- return totalBlocksScannedInLastRun.get();
- }
-
- /**
- * Reads the current and previous log files (if any) and marks the blocks
- * processed if they were processed within last scan period. Copies the log
- * records of recently scanned blocks from previous to current file.
- * Returns false if the process was interrupted because the thread is marked
- * to exit.
- */
- private boolean assignInitialVerificationTimes() {
- //First updates the last verification times from the log file.
- if (verificationLog != null) {
- long now = Time.monotonicNow();
- RollingLogs.LineIterator logIterator = null;
- try {
- logIterator = verificationLog.logs.iterator(false);
- // update verification times from the verificationLog.
- while (logIterator.hasNext()) {
- if (!datanode.shouldRun
- || datanode.blockScanner.blockScannerThread.isInterrupted()) {
- return false;
- }
- LogEntry entry = LogEntry.parseEntry(logIterator.next());
- if (entry != null) {
- updateBlockInfo(entry);
- if (now - entry.verificationTime < scanPeriod) {
- BlockScanInfo info = blockMap.get(new Block(entry.blockId, 0,
- entry.genStamp));
- if (info != null) {
- if (processedBlocks.get(entry.blockId) == null) {
- if (isNewPeriod) {
- updateBytesLeft(-info.getNumBytes());
- }
- processedBlocks.put(entry.blockId, 1);
- }
- if (logIterator.isLastReadFromPrevious()) {
- // write the log entry to current file
- // so that the entry is preserved for later runs.
- verificationLog.append(entry.verificationTime, entry.genStamp,
- entry.blockId);
- }
- }
- }
- }
- }
- } catch (IOException e) {
- LOG.warn("Failed to read previous verification times.", e);
- } finally {
- IOUtils.closeStream(logIterator);
- }
- isNewPeriod = false;
- }
-
-
- /* Before this loop, entries in blockInfoSet that are not
- * updated above have lastScanTime of <= 0 . Loop until first entry has
- * lastModificationTime > 0.
- */
- synchronized (this) {
- final int numBlocks = Math.max(blockMap.size(), 1);
- // Initially spread the block reads over half of scan period
- // so that we don't keep scanning the blocks too quickly when restarted.
- long verifyInterval = Math.min(scanPeriod/(2L * numBlocks), 10*60*1000L);
- long lastScanTime = Time.monotonicNow() - scanPeriod;
-
- if (!blockInfoSet.isEmpty()) {
- BlockScanInfo info;
- while ((info = blockInfoSet.first()).lastScanTime < 0) {
- delBlockInfo(info);
- info.lastScanTime = lastScanTime;
- lastScanTime += verifyInterval;
- addBlockInfo(info, false);
- }
- }
- }
-
- return true;
- }
-
- private synchronized void updateBytesLeft(long len) {
- bytesLeft += len;
- }
-
- private synchronized void startNewPeriod() {
- LOG.info("Starting a new period : work left in prev period : "
- + String.format("%.2f%%", totalBytesToScan == 0 ? 0
- : (bytesLeft * 100.0) / totalBytesToScan));
-
- // reset the byte counts :
- bytesLeft = totalBytesToScan;
- currentPeriodStart = Time.monotonicNow();
- isNewPeriod = true;
- }
-
- private synchronized boolean workRemainingInCurrentPeriod() {
- if (bytesLeft <= 0 && Time.monotonicNow() < currentPeriodStart + scanPeriod) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skipping scan since bytesLeft=" + bytesLeft + ", Start=" +
- currentPeriodStart + ", period=" + scanPeriod + ", now=" +
- Time.monotonicNow() + " " + blockPoolId);
- }
- return false;
- } else {
- return true;
- }
- }
-
- void scanBlockPoolSlice() {
- if (!workRemainingInCurrentPeriod()) {
- return;
- }
-
- // Create a new processedBlocks structure
- processedBlocks = new HashMap<Long, Integer>();
- if (!assignInitialVerificationTimes()) {
- return;
- }
- // Start scanning
- try {
- scan();
- } finally {
- totalBlocksScannedInLastRun.set(processedBlocks.size());
- lastScanTime.set(Time.monotonicNow());
- }
- }
-
- /**
- * Shuts down this BlockPoolSliceScanner and releases any internal resources.
- */
- void shutdown() {
- if (verificationLog != null) {
- verificationLog.close();
- }
- }
-
- private void scan() {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Starting to scan blockpool: " + blockPoolId);
- }
- try {
- adjustThrottler();
-
- while (datanode.shouldRun
- && !datanode.blockScanner.blockScannerThread.isInterrupted()
- && datanode.isBPServiceAlive(blockPoolId)) {
- long now = Time.monotonicNow();
- synchronized (this) {
- if ( now >= (currentPeriodStart + scanPeriod)) {
- startNewPeriod();
- }
- }
- if (((now - getEarliestScanTime()) >= scanPeriod)
- || ((!blockInfoSet.isEmpty()) && !(this.isFirstBlockProcessed()))) {
- verifyFirstBlock();
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("All remaining blocks were processed recently, "
- + "so this run is complete");
- }
- break;
- }
- }
- } catch (RuntimeException e) {
- LOG.warn("RuntimeException during BlockPoolScanner.scan()", e);
- throw e;
- } finally {
- rollVerificationLogs();
- rollNewBlocksInfo();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Done scanning block pool: " + blockPoolId);
- }
- }
- }
-
- // add new blocks to scan in next iteration
- private synchronized void rollNewBlocksInfo() {
- for (BlockScanInfo newBlock : newBlockInfoSet) {
- blockInfoSet.add(newBlock);
- }
- newBlockInfoSet.clear();
- }
-
- private synchronized void rollVerificationLogs() {
- if (verificationLog != null) {
- try {
- verificationLog.logs.roll();
- } catch (IOException ex) {
- LOG.warn("Received exception: ", ex);
- verificationLog.close();
- }
- }
- }
-
-
- synchronized void printBlockReport(StringBuilder buffer,
- boolean summaryOnly) {
- long oneHour = 3600*1000;
- long oneDay = 24*oneHour;
- long oneWeek = 7*oneDay;
- long fourWeeks = 4*oneWeek;
-
- int inOneHour = 0;
- int inOneDay = 0;
- int inOneWeek = 0;
- int inFourWeeks = 0;
- int inScanPeriod = 0;
- int neverScanned = 0;
-
- DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-
- int total = blockInfoSet.size();
-
- long now = Time.monotonicNow();
-
- Date date = new Date();
-
- for(Iterator<BlockScanInfo> it = blockInfoSet.iterator(); it.hasNext();) {
- BlockScanInfo info = it.next();
-
- long scanTime = info.getLastScanTime();
- long diff = now - scanTime;
-
- if (diff <= oneHour) inOneHour++;
- if (diff <= oneDay) inOneDay++;
- if (diff <= oneWeek) inOneWeek++;
- if (diff <= fourWeeks) inFourWeeks++;
- if (diff <= scanPeriod) inScanPeriod++;
- if (scanTime <= 0) neverScanned++;
-
- if (!summaryOnly) {
- date.setTime(scanTime);
- String scanType =
- (info.lastScanType == ScanType.VERIFICATION_SCAN) ? "local" : "none";
- buffer.append(String.format("%-26s : status : %-6s type : %-6s" +
- " scan time : " +
- "%-15d %s%n", info,
- (info.lastScanOk ? "ok" : "failed"),
- scanType, scanTime,
- (scanTime <= 0) ? "not yet verified" :
- dateFormat.format(date)));
- }
- }
-
- double pctPeriodLeft = (scanPeriod + currentPeriodStart - now)
- *100.0/scanPeriod;
- double pctProgress = (totalBytesToScan == 0) ? 100 :
- (totalBytesToScan-bytesLeft)*100.0/totalBytesToScan;
-
- buffer.append(String.format("%nTotal Blocks : %6d" +
- "%nVerified in last hour : %6d" +
- "%nVerified in last day : %6d" +
- "%nVerified in last week : %6d" +
- "%nVerified in last four weeks : %6d" +
- "%nVerified in SCAN_PERIOD : %6d" +
- "%nNot yet verified : %6d" +
- "%nVerified since restart : %6d" +
- "%nScans since restart : %6d" +
- "%nScan errors since restart : %6d" +
- "%nTransient scan errors : %6d" +
- "%nCurrent scan rate limit KBps : %6d" +
- "%nProgress this period : %6.0f%%" +
- "%nTime left in cur period : %6.2f%%" +
- "%n",
- total, inOneHour, inOneDay, inOneWeek,
- inFourWeeks, inScanPeriod, neverScanned,
- totalScans, totalScans,
- totalScanErrors, totalTransientErrors,
- Math.round(throttler.getBandwidth()/1024.0),
- pctProgress, pctPeriodLeft));
- }
-
- /**
- * This class takes care of log file used to store the last verification
- * times of the blocks.
- */
- private static class LogFileHandler {
- private final DateFormat dateFormat = new SimpleDateFormat(DATA_FORMAT);
-
- private final RollingLogs logs;
-
- private LogFileHandler(RollingLogs logs) {
- this.logs = logs;
- }
-
- void append(long verificationTime, long genStamp, long blockId) {
- final String m = LogEntry.toString(verificationTime, genStamp, blockId,
- dateFormat);
- try {
- logs.appender().append(m);
- } catch (IOException e) {
- LOG.warn("Failed to append to " + logs + ", m=" + m, e);
- }
- }
-
- void close() {
- try {
- logs.appender().close();
- } catch (IOException e) {
- LOG.warn("Failed to close the appender of " + logs, e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
index df8dd5c..12041a6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
@@ -193,20 +193,12 @@ class BlockReceiver implements Closeable {
break;
case PIPELINE_SETUP_APPEND:
replicaHandler = datanode.data.append(block, newGs, minBytesRcvd);
- if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
- block.getLocalBlock());
- }
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
break;
case PIPELINE_SETUP_APPEND_RECOVERY:
replicaHandler = datanode.data.recoverAppend(block, newGs, minBytesRcvd);
- if (datanode.blockScanner != null) { // remove from block scanner
- datanode.blockScanner.deleteBlock(block.getBlockPoolId(),
- block.getLocalBlock());
- }
block.setGenerationStamp(newGs);
datanode.notifyNamenodeReceivingBlock(
block, replicaHandler.getReplica().getStorageUuid());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
new file mode 100644
index 0000000..7429fff
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockScanner.java
@@ -0,0 +1,308 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
+
+import java.io.IOException;
+import java.util.Map.Entry;
+import java.util.TreeMap;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hdfs.server.datanode.VolumeScanner.ScanResultHandler;
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.Uninterruptibles;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.io.IOUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+@InterfaceAudience.Private
+public class BlockScanner {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(BlockScanner.class);
+
+ /**
+ * The DataNode that this scanner is associated with.
+ */
+ private final DataNode datanode;
+
+ /**
+ * Maps Storage IDs to VolumeScanner objects.
+ */
+ private final TreeMap<String, VolumeScanner> scanners =
+ new TreeMap<String, VolumeScanner>();
+
+ /**
+ * The scanner configuration.
+ */
+ private final Conf conf;
+
+ /**
+ * The cached scanner configuration.
+ */
+ static class Conf {
+ // These are a few internal configuration keys used for unit tests.
+ // They can't be set unless the static boolean allowUnitTestSettings has
+ // been set to true.
+
+ @VisibleForTesting
+ static final String INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS =
+ "internal.dfs.datanode.scan.period.ms.key";
+
+ @VisibleForTesting
+ static final String INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER =
+ "internal.volume.scanner.scan.result.handler";
+
+ @VisibleForTesting
+ static final String INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS =
+ "internal.dfs.block.scanner.max_staleness.ms";
+
+ @VisibleForTesting
+ static final long INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT =
+ TimeUnit.MILLISECONDS.convert(15, TimeUnit.MINUTES);
+
+ @VisibleForTesting
+ static final String INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS =
+ "dfs.block.scanner.cursor.save.interval.ms";
+
+ @VisibleForTesting
+ static final long
+ INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT =
+ TimeUnit.MILLISECONDS.convert(10, TimeUnit.MINUTES);
+
+ static boolean allowUnitTestSettings = false;
+ final long targetBytesPerSec;
+ final long maxStalenessMs;
+ final long scanPeriodMs;
+ final long cursorSaveMs;
+ final Class<? extends ScanResultHandler> resultHandler;
+
+ private static long getUnitTestLong(Configuration conf, String key,
+ long defVal) {
+ if (allowUnitTestSettings) {
+ return conf.getLong(key, defVal);
+ } else {
+ return defVal;
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ Conf(Configuration conf) {
+ this.targetBytesPerSec = Math.max(0L, conf.getLong(
+ DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND,
+ DFS_BLOCK_SCANNER_VOLUME_BYTES_PER_SECOND_DEFAULT));
+ this.maxStalenessMs = Math.max(0L, getUnitTestLong(conf,
+ INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS,
+ INTERNAL_DFS_BLOCK_SCANNER_MAX_STALENESS_MS_DEFAULT));
+ this.scanPeriodMs = Math.max(0L,
+ getUnitTestLong(conf, INTERNAL_DFS_DATANODE_SCAN_PERIOD_MS,
+ TimeUnit.MILLISECONDS.convert(conf.getLong(
+ DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
+ DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT), TimeUnit.HOURS)));
+ this.cursorSaveMs = Math.max(0L, getUnitTestLong(conf,
+ INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS,
+ INTERNAL_DFS_BLOCK_SCANNER_CURSOR_SAVE_INTERVAL_MS_DEFAULT));
+ if (allowUnitTestSettings) {
+ this.resultHandler = (Class<? extends ScanResultHandler>)
+ conf.getClass(INTERNAL_VOLUME_SCANNER_SCAN_RESULT_HANDLER,
+ ScanResultHandler.class);
+ } else {
+ this.resultHandler = ScanResultHandler.class;
+ }
+ }
+ }
+
+ public BlockScanner(DataNode datanode, Configuration conf) {
+ this.datanode = datanode;
+ this.conf = new Conf(conf);
+ if (isEnabled()) {
+ LOG.info("Initialized block scanner with targetBytesPerSec {}",
+ this.conf.targetBytesPerSec);
+ } else {
+ LOG.info("Disabled block scanner.");
+ }
+ }
+
+ /**
+ * Returns true if the block scanner is enabled.<p/>
+ *
+ * If the block scanner is disabled, no volume scanners will be created, and
+ * no threads will start.
+ */
+ public boolean isEnabled() {
+ return (conf.scanPeriodMs) > 0 && (conf.targetBytesPerSec > 0);
+ }
+
+ /**
+ * Set up a scanner for the given block pool and volume.
+ *
+ * @param ref A reference to the volume.
+ */
+ public synchronized void addVolumeScanner(FsVolumeReference ref) {
+ boolean success = false;
+ try {
+ FsVolumeSpi volume = ref.getVolume();
+ if (!isEnabled()) {
+ LOG.debug("Not adding volume scanner for {}, because the block " +
+ "scanner is disabled.", volume.getBasePath());
+ return;
+ }
+ VolumeScanner scanner = scanners.get(volume.getStorageID());
+ if (scanner != null) {
+ LOG.error("Already have a scanner for volume {}.",
+ volume.getBasePath());
+ return;
+ }
+ LOG.debug("Adding scanner for volume {} (StorageID {})",
+ volume.getBasePath(), volume.getStorageID());
+ scanner = new VolumeScanner(conf, datanode, ref);
+ scanner.start();
+ scanners.put(volume.getStorageID(), scanner);
+ success = true;
+ } finally {
+ if (!success) {
+ // If we didn't create a new VolumeScanner object, we don't
+ // need this reference to the volume.
+ IOUtils.cleanup(null, ref);
+ }
+ }
+ }
+
+ /**
+ * Stops and removes a volume scanner.<p/>
+ *
+ * This function will block until the volume scanner has stopped.
+ *
+ * @param volume The volume to remove.
+ */
+ public synchronized void removeVolumeScanner(FsVolumeSpi volume) {
+ if (!isEnabled()) {
+ LOG.debug("Not removing volume scanner for {}, because the block " +
+ "scanner is disabled.", volume.getStorageID());
+ return;
+ }
+ VolumeScanner scanner = scanners.get(volume.getStorageID());
+ if (scanner == null) {
+ LOG.warn("No scanner found to remove for volumeId {}",
+ volume.getStorageID());
+ return;
+ }
+ LOG.info("Removing scanner for volume {} (StorageID {})",
+ volume.getBasePath(), volume.getStorageID());
+ scanner.shutdown();
+ scanners.remove(volume.getStorageID());
+ Uninterruptibles.joinUninterruptibly(scanner, 5, TimeUnit.MINUTES);
+ }
+
+ /**
+ * Stops and removes all volume scanners.<p/>
+ *
+ * This function will block until all the volume scanners have stopped.
+ */
+ public synchronized void removeAllVolumeScanners() {
+ for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+ entry.getValue().shutdown();
+ }
+ for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+ Uninterruptibles.joinUninterruptibly(entry.getValue(),
+ 5, TimeUnit.MINUTES);
+ }
+ scanners.clear();
+ }
+
+ /**
+ * Enable scanning a given block pool id.
+ *
+ * @param bpid The block pool id to enable scanning for.
+ */
+ synchronized void enableBlockPoolId(String bpid) {
+ Preconditions.checkNotNull(bpid);
+ for (VolumeScanner scanner : scanners.values()) {
+ scanner.enableBlockPoolId(bpid);
+ }
+ }
+
+ /**
+ * Disable scanning a given block pool id.
+ *
+ * @param bpid The block pool id to disable scanning for.
+ */
+ synchronized void disableBlockPoolId(String bpid) {
+ Preconditions.checkNotNull(bpid);
+ for (VolumeScanner scanner : scanners.values()) {
+ scanner.disableBlockPoolId(bpid);
+ }
+ }
+
+ @VisibleForTesting
+ synchronized VolumeScanner.Statistics getVolumeStats(String volumeId) {
+ VolumeScanner scanner = scanners.get(volumeId);
+ if (scanner == null) {
+ return null;
+ }
+ return scanner.getStatistics();
+ }
+
+ synchronized void printStats(StringBuilder p) {
+ // print out all bpids that we're scanning ?
+ for (Entry<String, VolumeScanner> entry : scanners.entrySet()) {
+ entry.getValue().printStats(p);
+ }
+ }
+
+ @InterfaceAudience.Private
+ public static class Servlet extends HttpServlet {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public void doGet(HttpServletRequest request,
+ HttpServletResponse response) throws IOException {
+ response.setContentType("text/plain");
+
+ DataNode datanode = (DataNode)
+ getServletContext().getAttribute("datanode");
+ BlockScanner blockScanner = datanode.getBlockScanner();
+
+ StringBuilder buffer = new StringBuilder(8 * 1024);
+ if (!blockScanner.isEnabled()) {
+ LOG.warn("Periodic block scanner is not running");
+ buffer.append("Periodic block scanner is not running. " +
+ "Please check the datanode log if this is unexpected.");
+ } else {
+ buffer.append("Block Scanner Statistics\n\n");
+ blockScanner.printStats(buffer);
+ }
+ String resp = buffer.toString();
+ LOG.trace("Returned Servlet info {}", resp);
+ response.getWriter().write(resp);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
index 2d312d7..182b366 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockSender.java
@@ -600,9 +600,6 @@ class BlockSender implements java.io.Closeable {
String ioem = e.getMessage();
if (!ioem.startsWith("Broken pipe") && !ioem.startsWith("Connection reset")) {
LOG.error("BlockSender.sendChunks() exception: ", e);
- //Something might be wrong with the block. Make this block the high
- //priority block for verification.
- datanode.blockScanner.addBlock(block, true);
}
}
throw ioeToSocketException(e);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
deleted file mode 100644
index 450c2b1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java
+++ /dev/null
@@ -1,339 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.datanode;
-
-import java.io.IOException;
-import java.util.TreeMap;
-
-import javax.servlet.http.HttpServlet;
-import javax.servlet.http.HttpServletRequest;
-import javax.servlet.http.HttpServletResponse;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * DataBlockScanner manages block scanning for all the block pools. For each
- * block pool a {@link BlockPoolSliceScanner} is created which runs in a separate
- * thread to scan the blocks for that block pool. When a {@link BPOfferService}
- * becomes alive or dies, blockPoolScannerMap in this class is updated.
- */
-@InterfaceAudience.Private
-public class DataBlockScanner implements Runnable {
- public static final Log LOG = LogFactory.getLog(DataBlockScanner.class);
- private final DataNode datanode;
- private final FsDatasetSpi<? extends FsVolumeSpi> dataset;
- private final Configuration conf;
-
- static final int SLEEP_PERIOD_MS = 5 * 1000;
-
- /**
- * Map to find the BlockPoolScanner for a given block pool id. This is updated
- * when a BPOfferService becomes alive or dies.
- */
- private final TreeMap<String, BlockPoolSliceScanner> blockPoolScannerMap =
- new TreeMap<String, BlockPoolSliceScanner>();
- Thread blockScannerThread = null;
-
- DataBlockScanner(DataNode datanode,
- FsDatasetSpi<? extends FsVolumeSpi> dataset,
- Configuration conf) {
- this.datanode = datanode;
- this.dataset = dataset;
- this.conf = conf;
- }
-
- @Override
- public void run() {
- String currentBpId = "";
- boolean firstRun = true;
- while (datanode.shouldRun && !Thread.interrupted()) {
- //Sleep everytime except in the first iteration.
- if (!firstRun) {
- try {
- Thread.sleep(SLEEP_PERIOD_MS);
- } catch (InterruptedException ex) {
- // Interrupt itself again to set the interrupt status
- blockScannerThread.interrupt();
- continue;
- }
- } else {
- firstRun = false;
- }
-
- BlockPoolSliceScanner bpScanner = getNextBPScanner(currentBpId);
- if (bpScanner == null) {
- // Possible if thread is interrupted
- continue;
- }
- currentBpId = bpScanner.getBlockPoolId();
- // If BPOfferService for this pool is not alive, don't process it
- if (!datanode.isBPServiceAlive(currentBpId)) {
- LOG.warn("Block Pool " + currentBpId + " is not alive");
- // Remove in case BP service died abruptly without proper shutdown
- removeBlockPool(currentBpId);
- continue;
- }
- bpScanner.scanBlockPoolSlice();
- }
-
- // Call shutdown for each allocated BlockPoolSliceScanner.
- for (BlockPoolSliceScanner bpss: blockPoolScannerMap.values()) {
- bpss.shutdown();
- }
- }
-
- // Wait for at least one block pool to be up
- private void waitForInit() {
- while ((getBlockPoolSetSize() < datanode.getAllBpOs().length)
- || (getBlockPoolSetSize() < 1)) {
- try {
- Thread.sleep(SLEEP_PERIOD_MS);
- } catch (InterruptedException e) {
- blockScannerThread.interrupt();
- return;
- }
- }
- }
-
- /**
- * Find next block pool id to scan. There should be only one current
- * verification log file. Find which block pool contains the current
- * verification log file and that is used as the starting block pool id. If no
- * current files are found start with first block-pool in the blockPoolSet.
- * However, if more than one current files are found, the one with latest
- * modification time is used to find the next block pool id.
- */
- private BlockPoolSliceScanner getNextBPScanner(String currentBpId) {
-
- String nextBpId = null;
- while (datanode.shouldRun && !blockScannerThread.isInterrupted()) {
- waitForInit();
- synchronized (this) {
- if (getBlockPoolSetSize() > 0) {
- // Find nextBpId by the minimum of the last scan time
- long lastScanTime = 0;
- for (String bpid : blockPoolScannerMap.keySet()) {
- final long t = getBPScanner(bpid).getLastScanTime();
- if (t != 0L) {
- if (bpid == null || t < lastScanTime) {
- lastScanTime = t;
- nextBpId = bpid;
- }
- }
- }
-
- // nextBpId can still be null if no current log is found,
- // find nextBpId sequentially.
- if (nextBpId == null) {
- nextBpId = blockPoolScannerMap.higherKey(currentBpId);
- if (nextBpId == null) {
- nextBpId = blockPoolScannerMap.firstKey();
- }
- }
- if (nextBpId != null) {
- return getBPScanner(nextBpId);
- }
- }
- }
- LOG.warn("No block pool is up, going to wait");
- try {
- Thread.sleep(5000);
- } catch (InterruptedException ex) {
- LOG.warn("Received exception: " + ex);
- blockScannerThread.interrupt();
- return null;
- }
- }
- return null;
- }
-
- private synchronized int getBlockPoolSetSize() {
- return blockPoolScannerMap.size();
- }
-
- @VisibleForTesting
- synchronized BlockPoolSliceScanner getBPScanner(String bpid) {
- return blockPoolScannerMap.get(bpid);
- }
-
- private synchronized String[] getBpIdList() {
- return blockPoolScannerMap.keySet().toArray(
- new String[blockPoolScannerMap.keySet().size()]);
- }
-
- public void addBlock(ExtendedBlock block, boolean scanNow) {
- BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
- if (bpScanner != null) {
- bpScanner.addBlock(block, scanNow);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + block.getBlockPoolId());
- }
- }
-
- boolean isInitialized(String bpid) {
- return getBPScanner(bpid) != null;
- }
-
- public synchronized void printBlockReport(StringBuilder buffer,
- boolean summary) {
- String[] bpIdList = getBpIdList();
- if (bpIdList == null || bpIdList.length == 0) {
- buffer.append("Periodic block scanner is not yet initialized. "
- + "Please check back again after some time.");
- return;
- }
- for (String bpid : bpIdList) {
- BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
- buffer.append("\n\nBlock report for block pool: "+bpid + "\n");
- bpScanner.printBlockReport(buffer, summary);
- buffer.append("\n");
- }
- }
-
- public void deleteBlock(String poolId, Block toDelete) {
- BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
- if (bpScanner != null) {
- bpScanner.deleteBlock(toDelete);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + poolId);
- }
- }
-
- public void deleteBlocks(String poolId, Block[] toDelete) {
- BlockPoolSliceScanner bpScanner = getBPScanner(poolId);
- if (bpScanner != null) {
- bpScanner.deleteBlocks(toDelete);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + poolId);
- }
- }
-
- public void shutdown() {
- synchronized (this) {
- if (blockScannerThread != null) {
- blockScannerThread.interrupt();
- }
- }
-
- // We cannot join within the synchronized block, because it would create a
- // deadlock situation. blockScannerThread calls other synchronized methods.
- if (blockScannerThread != null) {
- try {
- blockScannerThread.join();
- } catch (InterruptedException e) {
- // shutting down anyway
- }
- }
- }
-
- public synchronized void addBlockPool(String blockPoolId) {
- if (blockPoolScannerMap.get(blockPoolId) != null) {
- return;
- }
- BlockPoolSliceScanner bpScanner = new BlockPoolSliceScanner(blockPoolId,
- datanode, dataset, conf);
- blockPoolScannerMap.put(blockPoolId, bpScanner);
- LOG.info("Added bpid=" + blockPoolId + " to blockPoolScannerMap, new size="
- + blockPoolScannerMap.size());
- }
-
- public synchronized void removeBlockPool(String blockPoolId) {
- BlockPoolSliceScanner bpss = blockPoolScannerMap.remove(blockPoolId);
- if (bpss != null) {
- bpss.shutdown();
- }
- LOG.info("Removed bpid="+blockPoolId+" from blockPoolScannerMap");
- }
-
- @VisibleForTesting
- long getBlocksScannedInLastRun(String bpid) throws IOException {
- BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
- if (bpScanner == null) {
- throw new IOException("Block Pool: "+bpid+" is not running");
- } else {
- return bpScanner.getBlocksScannedInLastRun();
- }
- }
-
- @VisibleForTesting
- long getTotalScans(String bpid) throws IOException {
- BlockPoolSliceScanner bpScanner = getBPScanner(bpid);
- if (bpScanner == null) {
- throw new IOException("Block Pool: "+bpid+" is not running");
- } else {
- return bpScanner.getTotalScans();
- }
- }
-
- @VisibleForTesting
- public void setLastScanTimeDifference(ExtendedBlock block, int lastScanTimeDifference) {
- BlockPoolSliceScanner bpScanner = getBPScanner(block.getBlockPoolId());
- if (bpScanner != null) {
- bpScanner.setLastScanTimeDifference(lastScanTimeDifference);
- } else {
- LOG.warn("No block pool scanner found for block pool id: "
- + block.getBlockPoolId());
- }
- }
-
- public void start() {
- blockScannerThread = new Thread(this);
- blockScannerThread.setDaemon(true);
- blockScannerThread.start();
- }
-
- @InterfaceAudience.Private
- public static class Servlet extends HttpServlet {
- private static final long serialVersionUID = 1L;
-
- @Override
- public void doGet(HttpServletRequest request,
- HttpServletResponse response) throws IOException {
- response.setContentType("text/plain");
-
- DataNode datanode = (DataNode) getServletContext().getAttribute("datanode");
- DataBlockScanner blockScanner = datanode.blockScanner;
-
- boolean summary = (request.getParameter("listblocks") == null);
-
- StringBuilder buffer = new StringBuilder(8*1024);
- if (blockScanner == null) {
- LOG.warn("Periodic block scanner is not running");
- buffer.append("Periodic block scanner is not running. " +
- "Please check the datanode log if this is unexpected.");
- } else {
- blockScanner.printBlockReport(buffer, summary);
- }
- response.getWriter().write(buffer.toString()); // extra copy!
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/df4edd9a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index 12df9d6..c77bc3d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -316,7 +316,7 @@ public class DataNode extends ReconfigurableBase
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private boolean hasAnyBlockPoolRegistered = false;
- volatile DataBlockScanner blockScanner = null;
+ private final BlockScanner blockScanner;
private DirectoryScanner directoryScanner = null;
/** Activated plug-ins. */
@@ -365,6 +365,7 @@ public class DataNode extends ReconfigurableBase
this.usersWithLocalPathAccess = null;
this.connectToDnViaHostname = false;
this.getHdfsBlockLocationsEnabled = false;
+ this.blockScanner = new BlockScanner(this, conf);
}
/**
@@ -375,6 +376,7 @@ public class DataNode extends ReconfigurableBase
final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException {
super(conf);
+ this.blockScanner = new BlockScanner(this, conf);
this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -671,7 +673,8 @@ public class DataNode extends ReconfigurableBase
this.infoServer.setAttribute("datanode", this);
this.infoServer.setAttribute(JspHelper.CURRENT_CONF, conf);
this.infoServer.addServlet(null, "/blockScannerReport",
- DataBlockScanner.Servlet.class);
+ BlockScanner.Servlet.class);
+
this.infoServer.start();
InetSocketAddress jettyAddr = infoServer.getConnectorAddress(0);
@@ -772,56 +775,12 @@ public class DataNode extends ReconfigurableBase
// Not a superuser.
throw new AccessControlException();
}
-
-/**
- * Initialize the datanode's periodic scanners:
- * {@link DataBlockScanner}
- * {@link DirectoryScanner}
- * They report results on a per-blockpool basis but do their scanning
- * on a per-Volume basis to minimize competition for disk iops.
- *
- * @param conf - Configuration has the run intervals and other
- * parameters for these periodic scanners
- */
- private void initPeriodicScanners(Configuration conf) {
- initDataBlockScanner(conf);
- initDirectoryScanner(conf);
- }
-
+
private void shutdownPeriodicScanners() {
shutdownDirectoryScanner();
- shutdownDataBlockScanner();
- }
-
- /**
- * See {@link DataBlockScanner}
- */
- private synchronized void initDataBlockScanner(Configuration conf) {
- if (blockScanner != null) {
- return;
- }
- String reason = null;
- assert data != null;
- if (conf.getInt(DFS_DATANODE_SCAN_PERIOD_HOURS_KEY,
- DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT) < 0) {
- reason = "verification is turned off by configuration";
- } else if ("SimulatedFSDataset".equals(data.getClass().getSimpleName())) {
- reason = "verifcation is not supported by SimulatedFSDataset";
- }
- if (reason == null) {
- blockScanner = new DataBlockScanner(this, data, conf);
- blockScanner.start();
- } else {
- LOG.info("Periodic Block Verification scan disabled because " + reason);
- }
+ blockScanner.removeAllVolumeScanners();
}
-
- private void shutdownDataBlockScanner() {
- if (blockScanner != null) {
- blockScanner.shutdown();
- }
- }
-
+
/**
* See {@link DirectoryScanner}
*/
@@ -1250,9 +1209,8 @@ public class DataNode extends ReconfigurableBase
// registering anywhere. If that's the case, we wouldn't have
// a block pool id
String bpId = bpos.getBlockPoolId();
- if (blockScanner != null) {
- blockScanner.removeBlockPool(bpId);
- }
+
+ blockScanner.disableBlockPoolId(bpId);
if (data != null) {
data.shutdownBlockPool(bpId);
@@ -1296,9 +1254,9 @@ public class DataNode extends ReconfigurableBase
// failures.
checkDiskError();
- initPeriodicScanners(conf);
-
+ initDirectoryScanner(conf);
data.addBlockPool(nsInfo.getBlockPoolID(), conf);
+ blockScanner.enableBlockPoolId(bpos.getBlockPoolId());
}
BPOfferService[] getAllBpOs() {
@@ -2168,10 +2126,6 @@ public class DataNode extends ReconfigurableBase
LOG.warn("Cannot find BPOfferService for reporting block received for bpid="
+ block.getBlockPoolId());
}
- FsVolumeSpi volume = getFSDataset().getVolume(block);
- if (blockScanner != null && !volume.isTransientStorage()) {
- blockScanner.addBlock(block, false);
- }
}
/** Start a single datanode daemon and wait for it to finish.
@@ -2445,8 +2399,9 @@ public class DataNode extends ReconfigurableBase
return data;
}
+ @VisibleForTesting
/** @return the block scanner. */
- public DataBlockScanner getBlockScanner() {
+ public BlockScanner getBlockScanner() {
return blockScanner;
}