You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/02/14 00:08:04 UTC
[2/3] hadoop git commit: HDFS-7430. Refactor the BlockScanner to use
O(1) memory and use multiple threads (cmccabe)
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
new file mode 100644
index 0000000..781b4d3
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/VolumeScanner.java
@@ -0,0 +1,652 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.io.DataOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner.Conf;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi.BlockIterator;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.util.DataTransferThrottler;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * VolumeScanner scans a single volume. Each VolumeScanner has its own thread.<p/>
+ * They are all managed by the DataNode's BlockScanner.
+ */
+public class VolumeScanner extends Thread {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(VolumeScanner.class);
+
+ /**
+ * Number of seconds in a minute.
+ */
+ private final static int SECONDS_PER_MINUTE = 60;
+
+ /**
+ * Number of minutes in an hour.
+ */
+ private final static int MINUTES_PER_HOUR = 60;
+
+ /**
+ * Name of the block iterator used by this scanner.
+ */
+ private final static String BLOCK_ITERATOR_NAME = "scanner";
+
+ /**
+ * The configuration.
+ */
+ private final Conf conf;
+
+ /**
+ * The DataNode this VolumEscanner is associated with.
+ */
+ private final DataNode datanode;
+
+ /**
+ * A reference to the volume that we're scanning.
+ */
+ private final FsVolumeReference ref;
+
+ /**
+ * The volume that we're scanning.
+ */
+ final FsVolumeSpi volume;
+
+ /**
+ * The number of scanned bytes in each minute of the last hour.<p/>
+ *
+ * This array is managed as a circular buffer. We take the monotonic time and
+ * divide it up into one-minute periods. Each entry in the array represents
+ * how many bytes were scanned during that period.
+ */
+ private final long scannedBytes[] = new long[MINUTES_PER_HOUR];
+
+ /**
+ * The sum of all the values of scannedBytes.
+ */
+ private long scannedBytesSum = 0;
+
+ /**
+ * The throttler to use with BlockSender objects.
+ */
+ private final DataTransferThrottler throttler = new DataTransferThrottler(1);
+
+ /**
+ * The null output stream to use with BlockSender objects.
+ */
+ private final DataOutputStream nullStream =
+ new DataOutputStream(new IOUtils.NullOutputStream());
+
+ /**
+ * The block iterators associated with this VolumeScanner.<p/>
+ *
+ * Each block pool has its own BlockIterator.
+ */
+ private final List<BlockIterator> blockIters =
+ new LinkedList<BlockIterator>();
+
+ /**
+ * The current block iterator, or null if there is none.
+ */
+ private BlockIterator curBlockIter = null;
+
+ /**
+ * True if the thread is stopping.<p/>
+ * Protected by this object's lock.
+ */
+ private boolean stopping = false;
+
+ /**
+ * The current minute, in monotonic terms.
+ */
+ private long curMinute = 0;
+
+ /**
+ * Handles scan results.
+ */
+ private final ScanResultHandler resultHandler;
+
+ private final Statistics stats = new Statistics();
+
+ static class Statistics {
+ long bytesScannedInPastHour = 0;
+ long blocksScannedInCurrentPeriod = 0;
+ long blocksScannedSinceRestart = 0;
+ long scansSinceRestart = 0;
+ long scanErrorsSinceRestart = 0;
+ long nextBlockPoolScanStartMs = -1;
+ long blockPoolPeriodEndsMs = -1;
+ ExtendedBlock lastBlockScanned = null;
+ boolean eof = false;
+
+ Statistics() {
+ }
+
+ Statistics(Statistics other) {
+ this.bytesScannedInPastHour = other.bytesScannedInPastHour;
+ this.blocksScannedInCurrentPeriod = other.blocksScannedInCurrentPeriod;
+ this.blocksScannedSinceRestart = other.blocksScannedSinceRestart;
+ this.scansSinceRestart = other.scansSinceRestart;
+ this.scanErrorsSinceRestart = other.scanErrorsSinceRestart;
+ this.nextBlockPoolScanStartMs = other.nextBlockPoolScanStartMs;
+ this.blockPoolPeriodEndsMs = other.blockPoolPeriodEndsMs;
+ this.lastBlockScanned = other.lastBlockScanned;
+ this.eof = other.eof;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().
+ append("Statistics{").
+ append("bytesScannedInPastHour=").append(bytesScannedInPastHour).
+ append(", blocksScannedInCurrentPeriod=").
+ append(blocksScannedInCurrentPeriod).
+ append(", blocksScannedSinceRestart=").
+ append(blocksScannedSinceRestart).
+ append(", scansSinceRestart=").append(scansSinceRestart).
+ append(", scanErrorsSinceRestart=").append(scanErrorsSinceRestart).
+ append(", nextBlockPoolScanStartMs=").append(nextBlockPoolScanStartMs).
+ append(", blockPoolPeriodEndsMs=").append(blockPoolPeriodEndsMs).
+ append(", lastBlockScanned=").append(lastBlockScanned).
+ append(", eof=").append(eof).
+ append("}").toString();
+ }
+ }
+
+ private static double positiveMsToHours(long ms) {
+ if (ms <= 0) {
+ return 0;
+ } else {
+ return TimeUnit.HOURS.convert(ms, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ public void printStats(StringBuilder p) {
+ p.append("Block scanner information for volume " +
+ volume.getStorageID() + " with base path " + volume.getBasePath() +
+ "%n");
+ synchronized (stats) {
+ p.append(String.format("Bytes verified in last hour : %57d%n",
+ stats.bytesScannedInPastHour));
+ p.append(String.format("Blocks scanned in current period : %57d%n",
+ stats.blocksScannedInCurrentPeriod));
+ p.append(String.format("Blocks scanned since restart : %57d%n",
+ stats.blocksScannedSinceRestart));
+ p.append(String.format("Block pool scans since restart : %57d%n",
+ stats.scansSinceRestart));
+ p.append(String.format("Block scan errors since restart : %57d%n",
+ stats.scanErrorsSinceRestart));
+ if (stats.nextBlockPoolScanStartMs > 0) {
+ p.append(String.format("Hours until next block pool scan : %57.3f%n",
+ positiveMsToHours(stats.nextBlockPoolScanStartMs -
+ Time.monotonicNow())));
+ }
+ if (stats.blockPoolPeriodEndsMs > 0) {
+ p.append(String.format("Hours until possible pool rescan : %57.3f%n",
+ positiveMsToHours(stats.blockPoolPeriodEndsMs -
+ Time.now())));
+ }
+ p.append(String.format("Last block scanned : %57s%n",
+ ((stats.lastBlockScanned == null) ? "none" :
+ stats.lastBlockScanned.toString())));
+ p.append(String.format("More blocks to scan in period : %57s%n",
+ !stats.eof));
+ p.append("%n");
+ }
+ }
+
+ static class ScanResultHandler {
+ private VolumeScanner scanner;
+
+ public void setup(VolumeScanner scanner) {
+ LOG.trace("Starting VolumeScanner {}",
+ scanner.volume.getBasePath());
+ this.scanner = scanner;
+ }
+
+ public void handle(ExtendedBlock block, IOException e) {
+ FsVolumeSpi volume = scanner.volume;
+ if (e == null) {
+ LOG.trace("Successfully scanned {} on {}", block, volume.getBasePath());
+ return;
+ }
+ // If the block does not exist anymore, then it's not an error.
+ if (!volume.getDataset().contains(block)) {
+ LOG.debug("Volume {}: block {} is no longer in the dataset.",
+ volume.getBasePath(), block);
+ return;
+ }
+ // If the block exists, the exception may due to a race with write:
+ // The BlockSender got an old block path in rbw. BlockReceiver removed
+ // the rbw block from rbw to finalized but BlockSender tried to open the
+ // file before BlockReceiver updated the VolumeMap. The state of the
+ // block can be changed again now, so ignore this error here. If there
+ // is a block really deleted by mistake, DirectoryScan should catch it.
+ if (e instanceof FileNotFoundException ) {
+ LOG.info("Volume {}: verification failed for {} because of " +
+ "FileNotFoundException. This may be due to a race with write.",
+ volume.getBasePath(), block);
+ return;
+ }
+ LOG.warn("Reporting bad {} on {}", block, volume.getBasePath());
+ try {
+ scanner.datanode.reportBadBlocks(block);
+ } catch (IOException ie) {
+ // This is bad, but not bad enough to shut down the scanner.
+ LOG.warn("Cannot report bad " + block.getBlockId(), e);
+ }
+ }
+ }
+
+ VolumeScanner(Conf conf, DataNode datanode, FsVolumeReference ref) {
+ this.conf = conf;
+ this.datanode = datanode;
+ this.ref = ref;
+ this.volume = ref.getVolume();
+ ScanResultHandler handler;
+ try {
+ handler = conf.resultHandler.newInstance();
+ } catch (Throwable e) {
+ LOG.error("unable to instantiate {}", conf.resultHandler, e);
+ handler = new ScanResultHandler();
+ }
+ this.resultHandler = handler;
+ setName("VolumeScannerThread(" + volume.getBasePath() + ")");
+ setDaemon(true);
+ }
+
+ private void saveBlockIterator(BlockIterator iter) {
+ try {
+ iter.save();
+ } catch (IOException e) {
+ LOG.warn("{}: error saving {}.", this, iter, e);
+ }
+ }
+
+ private void expireOldScannedBytesRecords(long monotonicMs) {
+ long newMinute =
+ TimeUnit.MINUTES.convert(monotonicMs, TimeUnit.MILLISECONDS);
+ newMinute = newMinute % MINUTES_PER_HOUR;
+ if (curMinute == newMinute) {
+ return;
+ }
+ // If a minute or more has gone past since we last updated the scannedBytes
+ // array, zero out the slots corresponding to those minutes.
+ for (long m = curMinute + 1; m <= newMinute; m++) {
+ LOG.trace("{}: updateScannedBytes is zeroing out slot {}. " +
+ "curMinute = {}; newMinute = {}", this, m % MINUTES_PER_HOUR,
+ curMinute, newMinute);
+ scannedBytesSum -= scannedBytes[(int)(m % MINUTES_PER_HOUR)];
+ scannedBytes[(int)(m % MINUTES_PER_HOUR)] = 0;
+ }
+ curMinute = newMinute;
+ }
+
+ /**
+ * Find a usable block iterator.<p/>
+ *
+ * We will consider available block iterators in order. This property is
+ * important so that we don't keep rescanning the same block pool id over
+ * and over, while other block pools stay unscanned.<p/>
+ *
+ * A block pool is always ready to scan if the iterator is not at EOF. If
+ * the iterator is at EOF, the block pool will be ready to scan when
+ * conf.scanPeriodMs milliseconds have elapsed since the iterator was last
+ * rewound.<p/>
+ *
+ * @return 0 if we found a usable block iterator; the
+ * length of time we should delay before
+ * checking again otherwise.
+ */
+ private synchronized long findNextUsableBlockIter() {
+ int numBlockIters = blockIters.size();
+ if (numBlockIters == 0) {
+ LOG.debug("{}: no block pools are registered.", this);
+ return Long.MAX_VALUE;
+ }
+ int curIdx;
+ if (curBlockIter == null) {
+ curIdx = 0;
+ } else {
+ curIdx = blockIters.indexOf(curBlockIter);
+ Preconditions.checkState(curIdx >= 0);
+ }
+ // Note that this has to be wall-clock time, not monotonic time. This is
+ // because the time saved in the cursor file is a wall-clock time. We do
+ // not want to save a monotonic time in the cursor file, because it resets
+ // every time the machine reboots (on most platforms).
+ long nowMs = Time.now();
+ long minTimeoutMs = Long.MAX_VALUE;
+ for (int i = 0; i < numBlockIters; i++) {
+ int idx = (curIdx + i + 1) % numBlockIters;
+ BlockIterator iter = blockIters.get(idx);
+ if (!iter.atEnd()) {
+ LOG.info("Now scanning bpid {} on volume {}",
+ iter.getBlockPoolId(), volume.getBasePath());
+ curBlockIter = iter;
+ return 0L;
+ }
+ long iterStartMs = iter.getIterStartMs();
+ long waitMs = (iterStartMs + conf.scanPeriodMs) - nowMs;
+ if (waitMs <= 0) {
+ iter.rewind();
+ LOG.info("Now rescanning bpid {} on volume {}, after more than " +
+ "{} hour(s)", iter.getBlockPoolId(), volume.getBasePath(),
+ TimeUnit.HOURS.convert(conf.scanPeriodMs, TimeUnit.MILLISECONDS));
+ curBlockIter = iter;
+ return 0L;
+ }
+ minTimeoutMs = Math.min(minTimeoutMs, waitMs);
+ }
+ LOG.info("{}: no suitable block pools found to scan. Waiting {} ms.",
+ this, minTimeoutMs);
+ return minTimeoutMs;
+ }
+
+ /**
+ * Scan a block.
+ *
+ * @param cblock The block to scan.
+ * @param bytesPerSec The bytes per second to scan at.
+ *
+ * @return The length of the block that was scanned, or
+ * -1 if the block could not be scanned.
+ */
+ private long scanBlock(ExtendedBlock cblock, long bytesPerSec) {
+ // 'cblock' has a valid blockId and block pool id, but we don't yet know the
+ // genstamp the block is supposed to have. Ask the FsDatasetImpl for this
+ // information.
+ ExtendedBlock block = null;
+ try {
+ Block b = volume.getDataset().getStoredBlock(
+ cblock.getBlockPoolId(), cblock.getBlockId());
+ if (b == null) {
+ LOG.info("FileNotFound while finding block {} on volume {}",
+ cblock, volume.getBasePath());
+ } else {
+ block = new ExtendedBlock(cblock.getBlockPoolId(), b);
+ }
+ } catch (FileNotFoundException e) {
+ LOG.info("FileNotFoundException while finding block {} on volume {}",
+ cblock, volume.getBasePath());
+ } catch (IOException e) {
+ LOG.warn("I/O error while finding block {} on volume {}",
+ cblock, volume.getBasePath());
+ }
+ if (block == null) {
+ return -1; // block not found.
+ }
+ BlockSender blockSender = null;
+ try {
+ blockSender = new BlockSender(block, 0, -1,
+ false, true, true, datanode, null,
+ CachingStrategy.newDropBehind());
+ throttler.setBandwidth(bytesPerSec);
+ long bytesRead = blockSender.sendBlock(nullStream, null, throttler);
+ resultHandler.handle(block, null);
+ return bytesRead;
+ } catch (IOException e) {
+ resultHandler.handle(block, e);
+ } finally {
+ IOUtils.cleanup(null, blockSender);
+ }
+ return -1;
+ }
+
+ @VisibleForTesting
+ static boolean calculateShouldScan(long targetBytesPerSec,
+ long scannedBytesSum) {
+ long effectiveBytesPerSec =
+ scannedBytesSum / (SECONDS_PER_MINUTE * MINUTES_PER_HOUR);
+ boolean shouldScan = effectiveBytesPerSec <= targetBytesPerSec;
+ LOG.trace("calculateShouldScan: effectiveBytesPerSec = {}, and " +
+ "targetBytesPerSec = {}. shouldScan = {}",
+ effectiveBytesPerSec, targetBytesPerSec, shouldScan);
+ return shouldScan;
+ }
+
+ /**
+ * Run an iteration of the VolumeScanner loop.
+ *
+ * @return The number of milliseconds to delay before running the loop
+ * again, or 0 to re-run the loop immediately.
+ */
+ private long runLoop() {
+ long bytesScanned = -1;
+ boolean scanError = false;
+ ExtendedBlock block = null;
+ try {
+ long monotonicMs = Time.monotonicNow();
+ expireOldScannedBytesRecords(monotonicMs);
+
+ if (!calculateShouldScan(conf.targetBytesPerSec, scannedBytesSum)) {
+ // If neededBytesPerSec is too low, then wait few seconds for some old
+ // scannedBytes records to expire.
+ return 30000L;
+ }
+
+ // Find a usable block pool to scan.
+ if ((curBlockIter == null) || curBlockIter.atEnd()) {
+ long timeout = findNextUsableBlockIter();
+ if (timeout > 0) {
+ LOG.trace("{}: no block pools are ready to scan yet. Waiting " +
+ "{} ms.", this, timeout);
+ synchronized (stats) {
+ stats.nextBlockPoolScanStartMs = Time.monotonicNow() + timeout;
+ }
+ return timeout;
+ }
+ synchronized (stats) {
+ stats.scansSinceRestart++;
+ stats.blocksScannedInCurrentPeriod = 0;
+ stats.nextBlockPoolScanStartMs = -1;
+ }
+ return 0L;
+ }
+
+ try {
+ block = curBlockIter.nextBlock();
+ } catch (IOException e) {
+ // There was an error listing the next block in the volume. This is a
+ // serious issue.
+ LOG.warn("{}: nextBlock error on {}", this, curBlockIter);
+ // On the next loop iteration, curBlockIter#eof will be set to true, and
+ // we will pick a different block iterator.
+ return 0L;
+ }
+ if (block == null) {
+ // The BlockIterator is at EOF.
+ LOG.info("{}: finished scanning block pool {}",
+ this, curBlockIter.getBlockPoolId());
+ saveBlockIterator(curBlockIter);
+ return 0;
+ }
+ long saveDelta = monotonicMs - curBlockIter.getLastSavedMs();
+ if (saveDelta >= conf.cursorSaveMs) {
+ LOG.debug("{}: saving block iterator {} after {} ms.",
+ this, curBlockIter, saveDelta);
+ saveBlockIterator(curBlockIter);
+ }
+ bytesScanned = scanBlock(block, conf.targetBytesPerSec);
+ if (bytesScanned >= 0) {
+ scannedBytesSum += bytesScanned;
+ scannedBytes[(int)(curMinute % MINUTES_PER_HOUR)] += bytesScanned;
+ } else {
+ scanError = true;
+ }
+ return 0L;
+ } finally {
+ synchronized (stats) {
+ stats.bytesScannedInPastHour = scannedBytesSum;
+ if (bytesScanned >= 0) {
+ stats.blocksScannedInCurrentPeriod++;
+ stats.blocksScannedSinceRestart++;
+ }
+ if (scanError) {
+ stats.scanErrorsSinceRestart++;
+ }
+ if (block != null) {
+ stats.lastBlockScanned = block;
+ }
+ if (curBlockIter == null) {
+ stats.eof = true;
+ stats.blockPoolPeriodEndsMs = -1;
+ } else {
+ stats.eof = curBlockIter.atEnd();
+ stats.blockPoolPeriodEndsMs =
+ curBlockIter.getIterStartMs() + conf.scanPeriodMs;
+ }
+ }
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ LOG.trace("{}: thread starting.", this);
+ resultHandler.setup(this);
+ try {
+ long timeout = 0;
+ while (true) {
+ // Take the lock to check if we should stop.
+ synchronized (this) {
+ if (stopping) {
+ break;
+ }
+ if (timeout > 0) {
+ wait(timeout);
+ if (stopping) {
+ break;
+ }
+ }
+ }
+ timeout = runLoop();
+ }
+ } catch (InterruptedException e) {
+ // We are exiting because of an InterruptedException,
+ // probably sent by VolumeScanner#shutdown.
+ LOG.trace("{} exiting because of InterruptedException.", this);
+ } catch (Throwable e) {
+ LOG.error("{} exiting because of exception ", this, e);
+ }
+ LOG.info("{} exiting.", this);
+ // Save the current position of all block iterators and close them.
+ for (BlockIterator iter : blockIters) {
+ saveBlockIterator(iter);
+ IOUtils.cleanup(null, iter);
+ }
+ } finally {
+ // When the VolumeScanner exits, release the reference we were holding
+ // on the volume. This will allow the volume to be removed later.
+ IOUtils.cleanup(null, ref);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "VolumeScanner(" + volume.getBasePath() +
+ ", " + volume.getStorageID() + ")";
+ }
+
+ /**
+ * Shut down this scanner.
+ */
+ public synchronized void shutdown() {
+ stopping = true;
+ notify();
+ this.interrupt();
+ }
+
+ /**
+ * Allow the scanner to scan the given block pool.
+ *
+ * @param bpid The block pool id.
+ */
+ public synchronized void enableBlockPoolId(String bpid) {
+ for (BlockIterator iter : blockIters) {
+ if (iter.getBlockPoolId().equals(bpid)) {
+ LOG.warn("{}: already enabled scanning on block pool {}", this, bpid);
+ return;
+ }
+ }
+ BlockIterator iter = null;
+ try {
+ // Load a block iterator for the next block pool on the volume.
+ iter = volume.loadBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+ LOG.trace("{}: loaded block iterator for {}.", this, bpid);
+ } catch (FileNotFoundException e) {
+ LOG.debug("{}: failed to load block iterator: " + e.getMessage(), this);
+ } catch (IOException e) {
+ LOG.warn("{}: failed to load block iterator.", this, e);
+ }
+ if (iter == null) {
+ iter = volume.newBlockIterator(bpid, BLOCK_ITERATOR_NAME);
+ LOG.trace("{}: created new block iterator for {}.", this, bpid);
+ }
+ iter.setMaxStalenessMs(conf.maxStalenessMs);
+ blockIters.add(iter);
+ notify();
+ }
+
+ /**
+ * Disallow the scanner from scanning the given block pool.
+ *
+ * @param bpid The block pool id.
+ */
+ public synchronized void disableBlockPoolId(String bpid) {
+ Iterator<BlockIterator> i = blockIters.iterator();
+ while (i.hasNext()) {
+ BlockIterator iter = i.next();
+ if (iter.getBlockPoolId().equals(bpid)) {
+ LOG.trace("{}: disabling scanning on block pool {}", this, bpid);
+ i.remove();
+ IOUtils.cleanup(null, iter);
+ if (curBlockIter == iter) {
+ curBlockIter = null;
+ }
+ notify();
+ return;
+ }
+ }
+ LOG.warn("{}: can't remove block pool {}, because it was never " +
+ "added.", this, bpid);
+ }
+
+ @VisibleForTesting
+ Statistics getStatistics() {
+ synchronized (stats) {
+ return new Statistics(stats);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index cc7aec5..c554bc39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -90,24 +90,30 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
}
}
- /**
- * Create rolling logs.
- *
- * @param prefix the prefix of the log names.
- * @return rolling logs
- */
- public RollingLogs createRollingLogs(String bpid, String prefix
- ) throws IOException;
-
/** @return a list of volumes. */
public List<V> getVolumes();
- /** Add an array of StorageLocation to FsDataset. */
+ /**
+ * Add a new volume to the FsDataset.<p/>
+ *
+ * If the FSDataset supports block scanning, this function registers
+ * the new volume with the block scanner.
+ *
+ * @param location The storage location for the new volume.
+ * @param nsInfos Namespace information for the new volume.
+ */
public void addVolume(
final StorageLocation location,
final List<NamespaceInfo> nsInfos) throws IOException;
- /** Removes a collection of volumes from FsDataset. */
+ /**
+ * Removes a collection of volumes from FsDataset.
+ *
+ * If the FSDataset supports block scanning, this function removes
+ * the volumes from the block scanner.
+ *
+ * @param volumes The storage locations of the volumes to remove.
+ */
public void removeVolumes(Collection<StorageLocation> volumes);
/** @return a storage with the given storage ID */
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
index d9c37cb..9b28e67 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java
@@ -17,11 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset;
+import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
/**
* This is an interface for the underlying volume.
@@ -69,4 +71,112 @@ public interface FsVolumeSpi {
/** Returns true if the volume is NOT backed by persistent storage. */
public boolean isTransientStorage();
+
+ /**
+ * BlockIterator will return ExtendedBlock entries from a block pool in
+ * this volume. The entries will be returned in sorted order.<p/>
+ *
+ * BlockIterator objects themselves do not always have internal
+ * synchronization, so they can only safely be used by a single thread at a
+ * time.<p/>
+ *
+ * Closing the iterator does not save it. You must call save to save it.
+ */
+ public interface BlockIterator extends Closeable {
+ /**
+ * Get the next block.<p/>
+ *
+ * Note that this block may be removed in between the time we list it,
+ * and the time the caller tries to use it, or it may represent a stale
+ * entry. Callers should handle the case where the returned block no
+ * longer exists.
+ *
+ * @return The next block, or null if there are no
+ * more blocks. Null if there was an error
+ * determining the next block.
+ *
+ * @throws IOException If there was an error getting the next block in
+ * this volume. In this case, EOF will be set on
+ * the iterator.
+ */
+ public ExtendedBlock nextBlock() throws IOException;
+
+ /**
+ * Returns true if we got to the end of the block pool.
+ */
+ public boolean atEnd();
+
+ /**
+ * Repositions the iterator at the beginning of the block pool.
+ */
+ public void rewind();
+
+ /**
+ * Save this block iterator to the underlying volume.
+ * Any existing saved block iterator with this name will be overwritten.
+ * maxStalenessMs will not be saved.
+ *
+ * @throws IOException If there was an error when saving the block
+ * iterator.
+ */
+ public void save() throws IOException;
+
+ /**
+ * Set the maximum staleness of entries that we will return.<p/>
+ *
+ * A maximum staleness of 0 means we will never return stale entries; a
+ * larger value will allow us to reduce resource consumption in exchange
+ * for returning more potentially stale entries. Even with staleness set
+ * to 0, consumers of this API must handle race conditions where block
+ * disappear before they can be processed.
+ */
+ public void setMaxStalenessMs(long maxStalenessMs);
+
+ /**
+ * Get the wall-clock time, measured in milliseconds since the Epoch,
+ * when this iterator was created.
+ */
+ public long getIterStartMs();
+
+ /**
+ * Get the wall-clock time, measured in milliseconds since the Epoch,
+ * when this iterator was last saved. Returns iterStartMs if the
+ * iterator was never saved.
+ */
+ public long getLastSavedMs();
+
+ /**
+ * Get the id of the block pool which this iterator traverses.
+ */
+ public String getBlockPoolId();
+ }
+
+ /**
+ * Create a new block iterator. It will start at the beginning of the
+ * block set.
+ *
+ * @param bpid The block pool id to iterate over.
+ * @param name The name of the block iterator to create.
+ *
+ * @return The new block iterator.
+ */
+ public BlockIterator newBlockIterator(String bpid, String name);
+
+ /**
+ * Load a saved block iterator.
+ *
+ * @param bpid The block pool id to iterate over.
+ * @param name The name of the block iterator to load.
+ *
+ * @return The saved block iterator.
+ * @throws IOException If there was an IO error loading the saved
+ * block iterator.
+ */
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException;
+
+ /**
+ * Get the FSDatasetSpi which this volume is a part of.
+ */
+ public FsDatasetSpi getDataset();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
deleted file mode 100644
index 5d54770..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/RollingLogs.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.Iterator;
-
-/**
- * Rolling logs consist of a current log and a set of previous logs.
- *
- * The implementation should support a single appender and multiple readers.
- */
-public interface RollingLogs {
- /**
- * To iterate the lines of the logs.
- */
- public interface LineIterator extends Iterator<String>, Closeable {
- /** Is the iterator iterating the previous? */
- public boolean isPrevious();
-
- /**
- * Is the last read entry from previous? This should be called after
- * reading.
- */
- public boolean isLastReadFromPrevious();
- }
-
- /**
- * To append text to the logs.
- */
- public interface Appender extends Appendable, Closeable {
- }
-
- /**
- * Create an iterator to iterate the lines in the logs.
- *
- * @param skipPrevious Should it skip reading the previous log?
- * @return a new iterator.
- */
- public LineIterator iterator(boolean skipPrevious) throws IOException;
-
- /**
- * @return the only appender to append text to the logs.
- * The same object is returned if it is invoked multiple times.
- */
- public Appender appender();
-
- /**
- * Roll current to previous.
- *
- * @return true if the rolling succeeded.
- * When it returns false, it is not equivalent to an error.
- * It means that the rolling cannot be performed at the moment,
- * e.g. the logs are being read.
- */
- public boolean roll() throws IOException;
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index cefb206..082fb9a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -70,7 +70,7 @@ import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -92,7 +92,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RoundRobinVolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskReplicaTracker.RamDiskReplica;
@@ -294,7 +293,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
- volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+ volumes = new FsVolumeList(volsFailed, datanode.getBlockScanner(),
+ blockChooserImpl);
asyncDiskService = new FsDatasetAsyncDiskService(datanode);
asyncLazyPersistService = new RamDiskAsyncLazyPersistService(datanode);
@@ -326,6 +326,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
+ FsVolumeReference ref = fsVolume.obtainReference();
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
@@ -336,7 +337,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
- volumes.addVolume(fsVolume);
+ volumes.addVolume(ref);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
@@ -375,6 +376,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
throw MultipleIOException.createIOException(exceptions);
}
+ final FsVolumeReference ref = fsVolume.obtainReference();
setupAsyncLazyPersistThread(fsVolume);
builder.build();
@@ -385,7 +387,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
DatanodeStorage.State.NORMAL,
storageType));
asyncDiskService.addVolume(sd.getCurrentDir());
- volumes.addVolume(fsVolume);
+ volumes.addVolume(ref);
}
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
@@ -429,9 +431,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
it.remove();
}
}
- // Delete blocks from the block scanner in batch.
- datanode.getBlockScanner().deleteBlocks(bpid,
- blocks.toArray(new Block[blocks.size()]));
}
storageMap.remove(sd.getStorageUuid());
@@ -790,7 +789,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
// Replace the old block if any to reschedule the scanning.
- datanode.getBlockScanner().addBlock(block, false);
return replicaInfo;
}
@@ -2025,10 +2023,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// Block is in memory and not on the disk
// Remove the block from volumeMap
volumeMap.remove(bpid, blockId);
- final DataBlockScanner blockScanner = datanode.getBlockScanner();
- if (blockScanner != null) {
- blockScanner.deleteBlock(bpid, new Block(blockId));
- }
if (vol.isTransientStorage()) {
ramDiskReplicaTracker.discardReplica(bpid, blockId, true);
}
@@ -2051,12 +2045,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaInfo diskBlockInfo = new FinalizedReplica(blockId,
diskFile.length(), diskGS, vol, diskFile.getParentFile());
volumeMap.add(bpid, diskBlockInfo);
- final DataBlockScanner blockScanner = datanode.getBlockScanner();
- if (!vol.isTransientStorage()) {
- if (blockScanner != null) {
- blockScanner.addBlock(new ExtendedBlock(bpid, diskBlockInfo), false);
- }
- } else {
+ if (vol.isTransientStorage()) {
ramDiskReplicaTracker.addReplica(bpid, blockId, (FsVolumeImpl) vol);
}
LOG.warn("Added missing block to memory " + diskBlockInfo);
@@ -2557,23 +2546,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
dataStorage.clearRollingUpgradeMarker(bpid);
}
- @Override
- public RollingLogs createRollingLogs(String bpid, String prefix
- ) throws IOException {
- String dir = null;
- final List<FsVolumeImpl> volumes = getVolumes();
- for (FsVolumeImpl vol : volumes) {
- String bpDir = vol.getPath(bpid);
- if (RollingLogsImpl.isFilePresent(bpDir, prefix)) {
- dir = bpDir;
- break;
- }
- }
- if (dir == null) {
- dir = volumes.get(0).getPath(bpid);
- }
- return new RollingLogsImpl(dir, prefix);
- }
@Override
public void onCompleteLazyPersist(String bpId, long blockId,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index a2d4f2e..e4877be 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -17,9 +17,18 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import java.io.BufferedWriter;
import java.io.File;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.channels.ClosedChannelException;
+import java.io.OutputStreamWriter;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.util.Collections;
+import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
@@ -41,15 +50,24 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.CloseableReferenceCount;
+import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.hadoop.util.Time;
+import org.codehaus.jackson.annotate.JsonProperty;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* The underlying volume used to store replica.
@@ -59,6 +77,9 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
@InterfaceAudience.Private
@VisibleForTesting
public class FsVolumeImpl implements FsVolumeSpi {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(FsVolumeImpl.class);
+
private final FsDatasetImpl dataset;
private final String storageID;
private final StorageType storageType;
@@ -395,6 +416,332 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
}
+ private enum SubdirFilter implements FilenameFilter {
+ INSTANCE;
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return name.startsWith("subdir");
+ }
+ }
+
+ private enum BlockFileFilter implements FilenameFilter {
+ INSTANCE;
+
+ @Override
+ public boolean accept(File dir, String name) {
+ return !name.endsWith(".meta") && name.startsWith("blk_");
+ }
+ }
+
+ @VisibleForTesting
+ public static String nextSorted(List<String> arr, String prev) {
+ int res = 0;
+ if (prev != null) {
+ res = Collections.binarySearch(arr, prev);
+ if (res < 0) {
+ res = -1 - res;
+ } else {
+ res++;
+ }
+ }
+ if (res >= arr.size()) {
+ return null;
+ }
+ return arr.get(res);
+ }
+
+ private static class BlockIteratorState {
+ BlockIteratorState() {
+ lastSavedMs = iterStartMs = Time.now();
+ curFinalizedDir = null;
+ curFinalizedSubDir = null;
+ curEntry = null;
+ atEnd = false;
+ }
+
+ // The wall-clock ms since the epoch at which this iterator was last saved.
+ @JsonProperty
+ private long lastSavedMs;
+
+ // The wall-clock ms since the epoch at which this iterator was created.
+ @JsonProperty
+ private long iterStartMs;
+
+ @JsonProperty
+ private String curFinalizedDir;
+
+ @JsonProperty
+ private String curFinalizedSubDir;
+
+ @JsonProperty
+ private String curEntry;
+
+ @JsonProperty
+ private boolean atEnd;
+ }
+
+ /**
+ * A BlockIterator implementation for FsVolumeImpl.
+ */
+ private class BlockIteratorImpl implements FsVolumeSpi.BlockIterator {
+ private final File bpidDir;
+ private final String name;
+ private final String bpid;
+ private long maxStalenessMs = 0;
+
+ private List<String> cache;
+ private long cacheMs;
+
+ private BlockIteratorState state;
+
+ BlockIteratorImpl(String bpid, String name) {
+ this.bpidDir = new File(currentDir, bpid);
+ this.name = name;
+ this.bpid = bpid;
+ rewind();
+ }
+
+ /**
+ * Get the next subdirectory within the block pool slice.
+ *
+ * @return The next subdirectory within the block pool slice, or
+ * null if there are no more.
+ */
+ private String getNextSubDir(String prev, File dir)
+ throws IOException {
+ List<String> children =
+ IOUtils.listDirectory(dir, SubdirFilter.INSTANCE);
+ cache = null;
+ cacheMs = 0;
+ if (children.size() == 0) {
+ LOG.trace("getNextSubDir({}, {}): no subdirectories found in {}",
+ storageID, bpid, dir.getAbsolutePath());
+ return null;
+ }
+ Collections.sort(children);
+ String nextSubDir = nextSorted(children, prev);
+ if (nextSubDir == null) {
+ LOG.trace("getNextSubDir({}, {}): no more subdirectories found in {}",
+ storageID, bpid, dir.getAbsolutePath());
+ } else {
+ LOG.trace("getNextSubDir({}, {}): picking next subdirectory {} " +
+ "within {}", storageID, bpid, nextSubDir, dir.getAbsolutePath());
+ }
+ return nextSubDir;
+ }
+
+ private String getNextFinalizedDir() throws IOException {
+ File dir = Paths.get(
+ bpidDir.getAbsolutePath(), "current", "finalized").toFile();
+ return getNextSubDir(state.curFinalizedDir, dir);
+ }
+
+ private String getNextFinalizedSubDir() throws IOException {
+ if (state.curFinalizedDir == null) {
+ return null;
+ }
+ File dir = Paths.get(
+ bpidDir.getAbsolutePath(), "current", "finalized",
+ state.curFinalizedDir).toFile();
+ return getNextSubDir(state.curFinalizedSubDir, dir);
+ }
+
+ private List<String> getSubdirEntries() throws IOException {
+ if (state.curFinalizedSubDir == null) {
+ return null; // There are no entries in the null subdir.
+ }
+ long now = Time.monotonicNow();
+ if (cache != null) {
+ long delta = now - cacheMs;
+ if (delta < maxStalenessMs) {
+ return cache;
+ } else {
+ LOG.trace("getSubdirEntries({}, {}): purging entries cache for {} " +
+ "after {} ms.", storageID, bpid, state.curFinalizedSubDir, delta);
+ cache = null;
+ }
+ }
+ File dir = Paths.get(bpidDir.getAbsolutePath(), "current", "finalized",
+ state.curFinalizedDir, state.curFinalizedSubDir).toFile();
+ List<String> entries =
+ IOUtils.listDirectory(dir, BlockFileFilter.INSTANCE);
+ if (entries.size() == 0) {
+ entries = null;
+ } else {
+ Collections.sort(entries);
+ }
+ if (entries == null) {
+ LOG.trace("getSubdirEntries({}, {}): no entries found in {}",
+ storageID, bpid, dir.getAbsolutePath());
+ } else {
+ LOG.trace("getSubdirEntries({}, {}): listed {} entries in {}",
+ storageID, bpid, entries.size(), dir.getAbsolutePath());
+ }
+ cache = entries;
+ cacheMs = now;
+ return cache;
+ }
+
+ /**
+ * Get the next block.<p/>
+ *
+ * Each volume has a hierarchical structure.<p/>
+ *
+ * <code>
+ * BPID B0
+ * finalized/
+ * subdir0
+ * subdir0
+ * blk_000
+ * blk_001
+ * ...
+ * subdir1
+ * subdir0
+ * ...
+ * rbw/
+ * </code>
+ *
+ * When we run out of entries at one level of the structure, we search
+ * progressively higher levels. For example, when we run out of blk_
+ * entries in a subdirectory, we search for the next subdirectory.
+ * And so on.
+ */
+ @Override
+ public ExtendedBlock nextBlock() throws IOException {
+ if (state.atEnd) {
+ return null;
+ }
+ try {
+ while (true) {
+ List<String> entries = getSubdirEntries();
+ if (entries != null) {
+ state.curEntry = nextSorted(entries, state.curEntry);
+ if (state.curEntry == null) {
+ LOG.trace("nextBlock({}, {}): advancing from {} to next " +
+ "subdirectory.", storageID, bpid, state.curFinalizedSubDir);
+ } else {
+ ExtendedBlock block =
+ new ExtendedBlock(bpid, Block.filename2id(state.curEntry));
+ LOG.trace("nextBlock({}, {}): advancing to {}",
+ storageID, bpid, block);
+ return block;
+ }
+ }
+ state.curFinalizedSubDir = getNextFinalizedSubDir();
+ if (state.curFinalizedSubDir == null) {
+ state.curFinalizedDir = getNextFinalizedDir();
+ if (state.curFinalizedDir == null) {
+ state.atEnd = true;
+ return null;
+ }
+ }
+ }
+ } catch (IOException e) {
+ state.atEnd = true;
+ LOG.error("nextBlock({}, {}): I/O error", storageID, bpid, e);
+ throw e;
+ }
+ }
+
+ @Override
+ public boolean atEnd() {
+ return state.atEnd;
+ }
+
+ @Override
+ public void rewind() {
+ cache = null;
+ cacheMs = 0;
+ state = new BlockIteratorState();
+ }
+
+ @Override
+ public void save() throws IOException {
+ state.lastSavedMs = Time.now();
+ boolean success = false;
+ ObjectMapper mapper = new ObjectMapper();
+ try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(
+ new FileOutputStream(getTempSaveFile(), false), "UTF-8"))) {
+ mapper.writerWithDefaultPrettyPrinter().writeValue(writer, state);
+ success = true;
+ } finally {
+ if (!success) {
+ if (getTempSaveFile().delete()) {
+ LOG.debug("save({}, {}): error deleting temporary file.",
+ storageID, bpid);
+ }
+ }
+ }
+ Files.move(getTempSaveFile().toPath(), getSaveFile().toPath(),
+ StandardCopyOption.ATOMIC_MOVE);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("save({}, {}): saved {}", storageID, bpid,
+ mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+ }
+ }
+
+ public void load() throws IOException {
+ ObjectMapper mapper = new ObjectMapper();
+ File file = getSaveFile();
+ this.state = mapper.reader(BlockIteratorState.class).readValue(file);
+ LOG.trace("load({}, {}): loaded iterator {} from {}: {}", storageID,
+ bpid, name, file.getAbsoluteFile(),
+ mapper.writerWithDefaultPrettyPrinter().writeValueAsString(state));
+ }
+
+ File getSaveFile() {
+ return new File(bpidDir, name + ".cursor");
+ }
+
+ File getTempSaveFile() {
+ return new File(bpidDir, name + ".cursor.tmp");
+ }
+
+ @Override
+ public void setMaxStalenessMs(long maxStalenessMs) {
+ this.maxStalenessMs = maxStalenessMs;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // No action needed for this volume implementation.
+ }
+
+ @Override
+ public long getIterStartMs() {
+ return state.iterStartMs;
+ }
+
+ @Override
+ public long getLastSavedMs() {
+ return state.lastSavedMs;
+ }
+
+ @Override
+ public String getBlockPoolId() {
+ return bpid;
+ }
+ }
+
+ @Override
+ public BlockIterator newBlockIterator(String bpid, String name) {
+ return new BlockIteratorImpl(bpid, name);
+ }
+
+ @Override
+ public BlockIterator loadBlockIterator(String bpid, String name)
+ throws IOException {
+ BlockIteratorImpl iter = new BlockIteratorImpl(bpid, name);
+ iter.load();
+ return iter;
+ }
+
+ @Override
+ public FsDatasetSpi getDataset() {
+ return dataset;
+ }
+
/**
* RBW files. They get moved to the finalized block directory when
* the block is finalized.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index c837593..ae2f5b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
+import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
@@ -42,11 +43,13 @@ class FsVolumeList {
private Object checkDirsMutex = new Object();
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
+ private final BlockScanner blockScanner;
private volatile int numFailedVolumes;
- FsVolumeList(int failedVols,
+ FsVolumeList(int failedVols, BlockScanner blockScanner,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
this.blockChooser = blockChooser;
+ this.blockScanner = blockScanner;
this.numFailedVolumes = failedVols;
}
@@ -260,13 +263,14 @@ class FsVolumeList {
/**
* Dynamically add new volumes to the existing volumes that this DN manages.
- * @param newVolume the instance of new FsVolumeImpl.
+ *
+ * @param ref a reference to the new FsVolumeImpl instance.
*/
- void addVolume(FsVolumeImpl newVolume) {
+ void addVolume(FsVolumeReference ref) {
while (true) {
final FsVolumeImpl[] curVolumes = volumes.get();
final List<FsVolumeImpl> volumeList = Lists.newArrayList(curVolumes);
- volumeList.add(newVolume);
+ volumeList.add((FsVolumeImpl)ref.getVolume());
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
break;
@@ -274,12 +278,15 @@ class FsVolumeList {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug(
"The volume list has been changed concurrently, " +
- "retry to remove volume: " + newVolume);
+ "retry to remove volume: " + ref.getVolume().getStorageID());
}
}
}
-
- FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+ if (blockScanner != null) {
+ blockScanner.addVolumeScanner(ref);
+ }
+ FsDatasetImpl.LOG.info("Added new volume: " +
+ ref.getVolume().getStorageID());
}
/**
@@ -293,6 +300,9 @@ class FsVolumeList {
if (volumeList.remove(target)) {
if (volumes.compareAndSet(curVolumes,
volumeList.toArray(new FsVolumeImpl[volumeList.size()]))) {
+ if (blockScanner != null) {
+ blockScanner.removeVolumeScanner(target);
+ }
try {
target.closeAndWait();
} catch (IOException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
deleted file mode 100644
index 121127d..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/RollingLogsImpl.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStreamReader;
-import java.io.OutputStreamWriter;
-import java.io.PrintWriter;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
-import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
-
-import com.google.common.base.Charsets;
-
-class RollingLogsImpl implements RollingLogs {
- private static final String CURR_SUFFIX = ".curr";
- private static final String PREV_SUFFIX = ".prev";
-
- static boolean isFilePresent(String dir, String filePrefix) {
- return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
- new File(dir, filePrefix + PREV_SUFFIX).exists();
- }
-
- private final File curr;
- private final File prev;
- private PrintWriter out; //require synchronized access
-
- private final Appender appender = new Appender() {
- @Override
- public Appendable append(CharSequence csq) {
- synchronized(RollingLogsImpl.this) {
- if (out == null) {
- throw new IllegalStateException(RollingLogsImpl.this
- + " is not yet opened.");
- }
- out.print(csq);
- out.flush();
- }
- return this;
- }
-
- @Override
- public Appendable append(char c) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public Appendable append(CharSequence csq, int start, int end) {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() {
- synchronized(RollingLogsImpl.this) {
- if (out != null) {
- out.close();
- out = null;
- }
- }
- }
- };
-
-
- private final AtomicInteger numReaders = new AtomicInteger();
-
- RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
- curr = new File(dir, filePrefix + CURR_SUFFIX);
- prev = new File(dir, filePrefix + PREV_SUFFIX);
- out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
- curr, true), Charsets.UTF_8));
- }
-
- @Override
- public Reader iterator(boolean skipPrevFile) throws IOException {
- numReaders.incrementAndGet();
- return new Reader(skipPrevFile);
- }
-
- @Override
- public Appender appender() {
- return appender;
- }
-
- @Override
- public boolean roll() throws IOException {
- if (numReaders.get() > 0) {
- return false;
- }
- if (!prev.delete() && prev.exists()) {
- throw new IOException("Failed to delete " + prev);
- }
-
- synchronized(this) {
- appender.close();
- final boolean renamed = curr.renameTo(prev);
- out = new PrintWriter(new OutputStreamWriter(new FileOutputStream(
- curr, true), Charsets.UTF_8));
- if (!renamed) {
- throw new IOException("Failed to rename " + curr + " to " + prev);
- }
- }
- return true;
- }
-
- @Override
- public String toString() {
- return curr.toString();
- }
-
- /**
- * This is used to read the lines in order.
- * If the data is not read completely (i.e, untill hasNext() returns
- * false), it needs to be explicitly
- */
- private class Reader implements RollingLogs.LineIterator {
- private File file;
- private File lastReadFile;
- private BufferedReader reader;
- private String line;
- private boolean closed = false;
-
- private Reader(boolean skipPrevFile) throws IOException {
- reader = null;
- file = skipPrevFile? curr : prev;
- readNext();
- }
-
- @Override
- public boolean isPrevious() {
- return file == prev;
- }
-
- @Override
- public boolean isLastReadFromPrevious() {
- return lastReadFile == prev;
- }
-
- private boolean openFile() throws IOException {
-
- for(int i=0; i<2; i++) {
- if (reader != null || i > 0) {
- // move to next file
- file = isPrevious()? curr : null;
- }
- if (file == null) {
- return false;
- }
- if (file.exists()) {
- break;
- }
- }
-
- if (reader != null ) {
- reader.close();
- reader = null;
- }
-
- reader = new BufferedReader(new InputStreamReader(new FileInputStream(
- file), Charsets.UTF_8));
- return true;
- }
-
- // read next line if possible.
- private void readNext() throws IOException {
- line = null;
- try {
- if (reader != null && (line = reader.readLine()) != null) {
- return;
- }
- // move to the next file.
- if (openFile()) {
- readNext();
- }
- } finally {
- if (!hasNext()) {
- close();
- }
- }
- }
-
- @Override
- public boolean hasNext() {
- return line != null;
- }
-
- @Override
- public String next() {
- String curLine = line;
- try {
- lastReadFile = file;
- readNext();
- } catch (IOException e) {
- DataBlockScanner.LOG.warn("Failed to read next line.", e);
- }
- return curLine;
- }
-
- @Override
- public void remove() {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public void close() throws IOException {
- if (!closed) {
- try {
- if (reader != null) {
- reader.close();
- }
- } finally {
- file = null;
- reader = null;
- closed = true;
- final int n = numReaders.decrementAndGet();
- assert(n >= 0);
- }
- }
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index bfaa33b..bb28f01 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -987,6 +987,26 @@
</property>
<property>
+ <name>dfs.datanode.scan.period.hours</name>
+ <value>0</value>
+ <description>
+ If this is 0 or negative, the DataNode's block scanner will be
+ disabled. If this is positive, the DataNode will not scan any
+ individual block more than once in the specified scan period.
+ </description>
+</property>
+
+<property>
+ <name>dfs.block.scanner.volume.bytes.per.second</name>
+ <value>1048576</value>
+ <description>
+ If this is 0, the DataNode's block scanner will be disabled. If this
+ is positive, this is the number of bytes per second that the DataNode's
+ block scanner will try to scan from each volume.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.readahead.bytes</name>
<value>4193404</value>
<description>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a8c6a96f/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
index a14c84c..086f9ff 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
@@ -1697,4 +1697,20 @@ public class DFSTestUtil {
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
}
+
+ /**
+ * Change the length of a block at datanode dnIndex
+ */
+ public static boolean changeReplicaLength(MiniDFSCluster cluster,
+ ExtendedBlock blk, int dnIndex, int lenDelta) throws IOException {
+ File blockFile = cluster.getBlockFile(dnIndex, blk);
+ if (blockFile != null && blockFile.exists()) {
+ RandomAccessFile raFile = new RandomAccessFile(blockFile, "rw");
+ raFile.setLength(raFile.length()+lenDelta);
+ raFile.close();
+ return true;
+ }
+ LOG.info("failed to change length of block " + blk);
+ return false;
+ }
}