You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/03/27 07:34:40 UTC
[38/50] [abbrv] hadoop git commit: HDFS-7928. Scanning blocks from
disk during rolling upgrade startup takes a lot of time if disks are busy.
Contributed by Rushabh Shah.
HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes a lot of time if disks are busy. Contributed by Rushabh Shah.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/19bc19e8
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/19bc19e8
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/19bc19e8
Branch: refs/heads/YARN-2928
Commit: 19bc19e82b31403eccdd9232fba3b94df01eebdc
Parents: fbc4853
Author: Kihwal Lee <ki...@apache.org>
Authored: Wed Mar 25 14:42:28 2015 -0500
Committer: Zhijie Shen <zj...@apache.org>
Committed: Thu Mar 26 23:29:48 2015 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/protocol/BlockListAsLongs.java | 37 +++
.../hadoop/hdfs/server/datanode/DataNode.java | 7 +-
.../datanode/fsdataset/impl/BlockPoolSlice.java | 268 ++++++++++++++-----
.../datanode/fsdataset/impl/FsDatasetImpl.java | 3 +-
.../datanode/fsdataset/impl/FsVolumeImpl.java | 8 +-
.../datanode/fsdataset/impl/FsVolumeList.java | 7 +-
.../apache/hadoop/hdfs/UpgradeUtilities.java | 5 +-
.../hdfs/server/datanode/DataNodeTestUtils.java | 7 +
.../fsdataset/impl/TestWriteToReplica.java | 152 +++++++++++
.../namenode/TestListCorruptFileBlocks.java | 6 +
.../namenode/TestProcessCorruptBlocks.java | 3 +
.../ha/TestPendingCorruptDnMessages.java | 3 +
13 files changed, 430 insertions(+), 79 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 8f1d5fc..62c2f91 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -339,6 +339,9 @@ Release 2.8.0 - UNRELEASED
HDFS-7713. Implement mkdirs in the HDFS Web UI. (Ravi Prakash via wheat9)
+ HDFS-7928. Scanning blocks from disk during rolling upgrade startup takes
+ a lot of time if disks are busy (Rushabh S Shah via kihwal)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
index 1c89ee4..834546b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/BlockListAsLongs.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.protocol;
import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -33,6 +35,7 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
import com.google.protobuf.CodedOutputStream;
+import com.google.protobuf.WireFormat;
@InterfaceAudience.Private
@InterfaceStability.Evolving
@@ -108,6 +111,40 @@ public abstract class BlockListAsLongs implements Iterable<BlockReportReplica> {
return builder.build();
}
+ public static BlockListAsLongs readFrom(InputStream is) throws IOException {
+ CodedInputStream cis = CodedInputStream.newInstance(is);
+ int numBlocks = -1;
+ ByteString blocksBuf = null;
+ while (!cis.isAtEnd()) {
+ int tag = cis.readTag();
+ int field = WireFormat.getTagFieldNumber(tag);
+ switch(field) {
+ case 0:
+ break;
+ case 1:
+ numBlocks = (int)cis.readInt32();
+ break;
+ case 2:
+ blocksBuf = cis.readBytes();
+ break;
+ default:
+ cis.skipField(tag);
+ break;
+ }
+ }
+ if (numBlocks != -1 && blocksBuf != null) {
+ return decodeBuffer(numBlocks, blocksBuf);
+ }
+ return null;
+ }
+
+ public void writeTo(OutputStream os) throws IOException {
+ CodedOutputStream cos = CodedOutputStream.newInstance(os);
+ cos.writeInt32(1, getNumberOfBlocks());
+ cos.writeBytes(2, getBlocksBuffer());
+ cos.flush();
+ }
+
public static Builder builder() {
return new BlockListAsLongs.Builder();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index d94375e..3368124 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -41,8 +41,6 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMOR
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
@@ -159,6 +157,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.DatanodeHttpServer;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -2500,6 +2499,10 @@ public class DataNode extends ReconfigurableBase
return blockScanner;
}
+ @VisibleForTesting
+ DirectoryScanner getDirectoryScanner() {
+ return directoryScanner;
+ }
public static void secureMain(String args[], SecureResources resources) {
int errorCode = 0;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
index 5a69e1e..6daf039 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/BlockPoolSlice.java
@@ -23,12 +23,12 @@ import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
-import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.RandomAccessFile;
import java.io.Writer;
+import java.util.Iterator;
import java.util.Scanner;
import org.apache.commons.io.FileUtils;
@@ -39,6 +39,8 @@ import org.apache.hadoop.fs.DU;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportReplica;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
@@ -55,6 +57,7 @@ import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Time;
+import com.google.common.io.Files;
/**
* A block pool slice represents a portion of a block pool stored on a volume.
* Taken together, all BlockPoolSlices sharing a block pool ID across a
@@ -77,7 +80,9 @@ class BlockPoolSlice {
private volatile boolean dfsUsedSaved = false;
private static final int SHUTDOWN_HOOK_PRIORITY = 30;
private final boolean deleteDuplicateReplicas;
-
+ private static final String REPLICA_CACHE_FILE = "replicas";
+ private final long replicaCacheExpiry = 5*60*1000;
+
// TODO:FEDERATION scalability issue - a thread per DU is needed
private final DU dfsUsage;
@@ -310,11 +315,14 @@ class BlockPoolSlice {
FsDatasetImpl.LOG.info(
"Recovered " + numRecovered + " replicas from " + lazypersistDir);
}
-
- // add finalized replicas
- addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
- // add rbw replicas
- addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+
+ boolean success = readReplicasFromCache(volumeMap, lazyWriteReplicaMap);
+ if (!success) {
+ // add finalized replicas
+ addToReplicasMap(volumeMap, finalizedDir, lazyWriteReplicaMap, true);
+ // add rbw replicas
+ addToReplicasMap(volumeMap, rbwDir, lazyWriteReplicaMap, false);
+ }
}
/**
@@ -401,6 +409,75 @@ class BlockPoolSlice {
FileUtil.fullyDelete(source);
return numRecovered;
}
+
+ private void addReplicaToReplicasMap(Block block, ReplicaMap volumeMap,
+ final RamDiskReplicaTracker lazyWriteReplicaMap,boolean isFinalized)
+ throws IOException {
+ ReplicaInfo newReplica = null;
+ long blockId = block.getBlockId();
+ long genStamp = block.getGenerationStamp();
+ if (isFinalized) {
+ newReplica = new FinalizedReplica(blockId,
+ block.getNumBytes(), genStamp, volume, DatanodeUtil
+ .idToBlockDir(finalizedDir, blockId));
+ } else {
+ File file = new File(rbwDir, block.getBlockName());
+ boolean loadRwr = true;
+ File restartMeta = new File(file.getParent() +
+ File.pathSeparator + "." + file.getName() + ".restart");
+ Scanner sc = null;
+ try {
+ sc = new Scanner(restartMeta, "UTF-8");
+ // The restart meta file exists
+ if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
+ // It didn't expire. Load the replica as a RBW.
+ // We don't know the expected block length, so just use 0
+ // and don't reserve any more space for writes.
+ newReplica = new ReplicaBeingWritten(blockId,
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile(), null, 0);
+ loadRwr = false;
+ }
+ sc.close();
+ if (!restartMeta.delete()) {
+ FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
+ restartMeta.getPath());
+ }
+ } catch (FileNotFoundException fnfe) {
+ // nothing to do hereFile dir =
+ } finally {
+ if (sc != null) {
+ sc.close();
+ }
+ }
+ // Restart meta doesn't exist or expired.
+ if (loadRwr) {
+ newReplica = new ReplicaWaitingToBeRecovered(blockId,
+ validateIntegrityAndSetLength(file, genStamp),
+ genStamp, volume, file.getParentFile());
+ }
+ }
+
+ ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
+ if (oldReplica == null) {
+ volumeMap.add(bpid, newReplica);
+ } else {
+ // We have multiple replicas of the same block so decide which one
+ // to keep.
+ newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
+ }
+
+ // If we are retaining a replica on transient storage make sure
+ // it is in the lazyWriteReplicaMap so it can be persisted
+ // eventually.
+ if (newReplica.getVolume().isTransientStorage()) {
+ lazyWriteReplicaMap.addReplica(bpid, blockId,
+ (FsVolumeImpl) newReplica.getVolume());
+ } else {
+ lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
+ }
+ }
+
/**
* Add replicas under the given directory to the volume map
@@ -434,66 +511,9 @@ class BlockPoolSlice {
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
files, file);
long blockId = Block.filename2id(file.getName());
- ReplicaInfo newReplica = null;
- if (isFinalized) {
- newReplica = new FinalizedReplica(blockId,
- file.length(), genStamp, volume, file.getParentFile());
- } else {
-
- boolean loadRwr = true;
- File restartMeta = new File(file.getParent() +
- File.pathSeparator + "." + file.getName() + ".restart");
- Scanner sc = null;
- try {
- sc = new Scanner(restartMeta, "UTF-8");
- // The restart meta file exists
- if (sc.hasNextLong() && (sc.nextLong() > Time.now())) {
- // It didn't expire. Load the replica as a RBW.
- // We don't know the expected block length, so just use 0
- // and don't reserve any more space for writes.
- newReplica = new ReplicaBeingWritten(blockId,
- validateIntegrityAndSetLength(file, genStamp),
- genStamp, volume, file.getParentFile(), null, 0);
- loadRwr = false;
- }
- sc.close();
- if (!restartMeta.delete()) {
- FsDatasetImpl.LOG.warn("Failed to delete restart meta file: " +
- restartMeta.getPath());
- }
- } catch (FileNotFoundException fnfe) {
- // nothing to do hereFile dir =
- } finally {
- if (sc != null) {
- sc.close();
- }
- }
- // Restart meta doesn't exist or expired.
- if (loadRwr) {
- newReplica = new ReplicaWaitingToBeRecovered(blockId,
- validateIntegrityAndSetLength(file, genStamp),
- genStamp, volume, file.getParentFile());
- }
- }
-
- ReplicaInfo oldReplica = volumeMap.get(bpid, newReplica.getBlockId());
- if (oldReplica == null) {
- volumeMap.add(bpid, newReplica);
- } else {
- // We have multiple replicas of the same block so decide which one
- // to keep.
- newReplica = resolveDuplicateReplicas(newReplica, oldReplica, volumeMap);
- }
-
- // If we are retaining a replica on transient storage make sure
- // it is in the lazyWriteReplicaMap so it can be persisted
- // eventually.
- if (newReplica.getVolume().isTransientStorage()) {
- lazyWriteReplicaMap.addReplica(bpid, blockId,
- (FsVolumeImpl) newReplica.getVolume());
- } else {
- lazyWriteReplicaMap.discardReplica(bpid, blockId, false);
- }
+ Block block = new Block(blockId, file.length(), genStamp);
+ addReplicaToReplicasMap(block, volumeMap, lazyWriteReplicaMap,
+ isFinalized);
}
}
@@ -649,9 +669,121 @@ class BlockPoolSlice {
return currentDir.getAbsolutePath();
}
- void shutdown() {
+ void shutdown(BlockListAsLongs blocksListToPersist) {
+ saveReplicas(blocksListToPersist);
saveDfsUsed();
dfsUsedSaved = true;
dfsUsage.shutdown();
}
+
+ private boolean readReplicasFromCache(ReplicaMap volumeMap,
+ final RamDiskReplicaTracker lazyWriteReplicaMap) {
+ ReplicaMap tmpReplicaMap = new ReplicaMap(this);
+ File replicaFile = new File(currentDir, REPLICA_CACHE_FILE);
+ // Check whether the file exists or not.
+ if (!replicaFile.exists()) {
+ LOG.info("Replica Cache file: "+ replicaFile.getPath() +
+ " doesn't exist ");
+ return false;
+ }
+ long fileLastModifiedTime = replicaFile.lastModified();
+ if (System.currentTimeMillis() > fileLastModifiedTime + replicaCacheExpiry) {
+ LOG.info("Replica Cache file: " + replicaFile.getPath() +
+ " has gone stale");
+ // Just to make findbugs happy
+ if (!replicaFile.delete()) {
+ LOG.info("Replica Cache file: " + replicaFile.getPath() +
+ " cannot be deleted");
+ }
+ return false;
+ }
+ FileInputStream inputStream = null;
+ try {
+ inputStream = new FileInputStream(replicaFile);
+ BlockListAsLongs blocksList = BlockListAsLongs.readFrom(inputStream);
+ Iterator<BlockReportReplica> iterator = blocksList.iterator();
+ while (iterator.hasNext()) {
+ BlockReportReplica replica = iterator.next();
+ switch (replica.getState()) {
+ case FINALIZED:
+ addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, true);
+ break;
+ case RUR:
+ case RBW:
+ case RWR:
+ addReplicaToReplicasMap(replica, tmpReplicaMap, lazyWriteReplicaMap, false);
+ break;
+ default:
+ break;
+ }
+ }
+ inputStream.close();
+ // Now it is safe to add the replica into volumeMap
+ // In case of any exception during parsing this cache file, fall back
+ // to scan all the files on disk.
+ for (ReplicaInfo info: tmpReplicaMap.replicas(bpid)) {
+ volumeMap.add(bpid, info);
+ }
+ LOG.info("Successfully read replica from cache file : "
+ + replicaFile.getPath());
+ return true;
+ } catch (Exception e) {
+ // Any exception we need to revert back to read from disk
+ // Log the error and return false
+ LOG.info("Exception occured while reading the replicas cache file: "
+ + replicaFile.getPath(), e );
+ return false;
+ }
+ finally {
+ if (!replicaFile.delete()) {
+ LOG.info("Failed to delete replica cache file: " +
+ replicaFile.getPath());
+ }
+ // close the inputStream
+ IOUtils.closeStream(inputStream);
+ }
+ }
+
+ private void saveReplicas(BlockListAsLongs blocksListToPersist) {
+ if (blocksListToPersist == null ||
+ blocksListToPersist.getNumberOfBlocks()== 0) {
+ return;
+ }
+ File tmpFile = new File(currentDir, REPLICA_CACHE_FILE + ".tmp");
+ if (tmpFile.exists() && !tmpFile.delete()) {
+ LOG.warn("Failed to delete tmp replicas file in " +
+ tmpFile.getPath());
+ return;
+ }
+ File replicaCacheFile = new File(currentDir, REPLICA_CACHE_FILE);
+ if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
+ LOG.warn("Failed to delete replicas file in " +
+ replicaCacheFile.getPath());
+ return;
+ }
+
+ FileOutputStream out = null;
+ try {
+ out = new FileOutputStream(tmpFile);
+ blocksListToPersist.writeTo(out);
+ out.close();
+ // Renaming the tmp file to replicas
+ Files.move(tmpFile, replicaCacheFile);
+ } catch (Exception e) {
+ // If write failed, the volume might be bad. Since the cache file is
+ // not critical, log the error, delete both the files (tmp and cache)
+ // and continue.
+ LOG.warn("Failed to write replicas to cache ", e);
+ if (replicaCacheFile.exists() && !replicaCacheFile.delete()) {
+ LOG.warn("Failed to delete replicas file: " +
+ replicaCacheFile.getPath());
+ }
+ } finally {
+ IOUtils.closeStream(out);
+ if (tmpFile.exists() && !tmpFile.delete()) {
+ LOG.warn("Failed to delete tmp file in " +
+ tmpFile.getPath());
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 05c4871..cf471ca 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -2463,8 +2463,9 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override
public synchronized void shutdownBlockPool(String bpid) {
LOG.info("Removing block pool " + bpid);
+ Map<DatanodeStorage, BlockListAsLongs> blocksPerVolume = getBlockReports(bpid);
volumeMap.cleanUpBlockPool(bpid);
- volumes.removeBlockPool(bpid);
+ volumes.removeBlockPool(bpid, blocksPerVolume);
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
index 23efbdf..4dbc7f1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
@@ -50,6 +50,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
@@ -65,7 +66,6 @@ import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import org.codehaus.jackson.annotate.JsonProperty;
import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -805,7 +805,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) {
- entry.getValue().shutdown();
+ entry.getValue().shutdown(null);
}
}
@@ -815,10 +815,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
bpSlices.put(bpid, bp);
}
- void shutdownBlockPool(String bpid) {
+ void shutdownBlockPool(String bpid, BlockListAsLongs blocksListsAsLongs) {
BlockPoolSlice bp = bpSlices.get(bpid);
if (bp != null) {
- bp.shutdown();
+ bp.shutdown(blocksListsAsLongs);
}
bpSlices.remove(bpid);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index a5611c5..4fddfb9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -35,10 +35,12 @@ import java.util.concurrent.atomic.AtomicReference;
import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
@@ -428,9 +430,10 @@ class FsVolumeList {
bpid + ": " + totalTimeTaken + "ms");
}
- void removeBlockPool(String bpid) {
+ void removeBlockPool(String bpid, Map<DatanodeStorage, BlockListAsLongs>
+ blocksPerVolume) {
for (FsVolumeImpl v : volumes.get()) {
- v.shutdownBlockPool(bpid);
+ v.shutdownBlockPool(bpid, blocksPerVolume.get(v.toDatanodeStorage()));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
index 2e5348e..e9891bf 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/UpgradeUtilities.java
@@ -304,10 +304,11 @@ public class UpgradeUtilities {
continue;
}
- // skip VERSION and dfsUsed file for DataNodes
+ // skip VERSION and dfsUsed and replicas file for DataNodes
if (nodeType == DATA_NODE &&
(list[i].getName().equals("VERSION") ||
- list[i].getName().equals("dfsUsed"))) {
+ list[i].getName().equals("dfsUsed") ||
+ list[i].getName().equals("replicas"))) {
continue;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
index f9a2ba1..9dee724 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java
@@ -218,4 +218,11 @@ public class DataNodeTestUtils {
}
}
}
+
+ public static void runDirectoryScanner(DataNode dn) throws IOException {
+ DirectoryScanner directoryScanner = dn.getDirectoryScanner();
+ if (directoryScanner != null) {
+ dn.getDirectoryScanner().reconcile();
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
index 9325cdc..96a73c6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestWriteToReplica.java
@@ -17,14 +17,25 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
+import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
@@ -34,6 +45,7 @@ import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.junit.Assert;
import org.junit.Test;
@@ -501,4 +513,144 @@ public class TestWriteToReplica {
+ "genstamp and replaced it with the newer one: " + blocks[NON_EXISTENT]);
}
}
+
+ /**
+ * This is a test to check the replica map before and after the datanode
+ * quick restart (less than 5 minutes)
+ * @throws Exception
+ */
+ @Test
+ public void testReplicaMapAfterDatanodeRestart() throws Exception {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
+ .build();
+ try {
+ cluster.waitActive();
+ NameNode nn1 = cluster.getNameNode(0);
+ NameNode nn2 = cluster.getNameNode(1);
+ assertNotNull("cannot create nn1", nn1);
+ assertNotNull("cannot create nn2", nn2);
+
+ // check number of volumes in fsdataset
+ DataNode dn = cluster.getDataNodes().get(0);
+ FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.
+ getFSDataset(dn);
+ ReplicaMap replicaMap = dataSet.volumeMap;
+
+ List<FsVolumeImpl> volumes = dataSet.getVolumes();
+ // number of volumes should be 2 - [data1, data2]
+ assertEquals("number of volumes is wrong", 2, volumes.size());
+ ArrayList<String> bpList = new ArrayList<String>(Arrays.asList(
+ cluster.getNamesystem(0).getBlockPoolId(),
+ cluster.getNamesystem(1).getBlockPoolId()));
+
+ Assert.assertTrue("Cluster should have 2 block pools",
+ bpList.size() == 2);
+
+ createReplicas(bpList, volumes, replicaMap);
+ ReplicaMap oldReplicaMap = new ReplicaMap(this);
+ oldReplicaMap.addAll(replicaMap);
+
+ cluster.restartDataNode(0);
+ cluster.waitActive();
+ dn = cluster.getDataNodes().get(0);
+ dataSet = (FsDatasetImpl) dn.getFSDataset();
+ testEqualityOfReplicaMap(oldReplicaMap, dataSet.volumeMap, bpList);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Compare the replica map before and after the restart
+ **/
+ private void testEqualityOfReplicaMap(ReplicaMap oldReplicaMap, ReplicaMap
+ newReplicaMap, List<String> bpidList) {
+ // Traversing through newReplica map and remove the corresponding
+ // replicaInfo from oldReplicaMap.
+ for (String bpid: bpidList) {
+ for (ReplicaInfo info: newReplicaMap.replicas(bpid)) {
+ assertNotNull("Volume map before restart didn't contain the "
+ + "blockpool: " + bpid, oldReplicaMap.replicas(bpid));
+
+ ReplicaInfo oldReplicaInfo = oldReplicaMap.get(bpid,
+ info.getBlockId());
+ // Volume map after restart contains a blockpool id which
+ assertNotNull("Old Replica Map didnt't contain block with blockId: " +
+ info.getBlockId(), oldReplicaInfo);
+
+ ReplicaState oldState = oldReplicaInfo.getState();
+ // Since after restart, all the RWR, RBW and RUR blocks gets
+ // converted to RWR
+ if (info.getState() == ReplicaState.RWR) {
+ if (oldState == ReplicaState.RWR || oldState == ReplicaState.RBW
+ || oldState == ReplicaState.RUR) {
+ oldReplicaMap.remove(bpid, oldReplicaInfo);
+ }
+ } else if (info.getState() == ReplicaState.FINALIZED &&
+ oldState == ReplicaState.FINALIZED) {
+ oldReplicaMap.remove(bpid, oldReplicaInfo);
+ }
+ }
+ }
+
+ // We don't persist the ReplicaInPipeline replica
+ // and if the old replica map contains any replica except ReplicaInPipeline
+ // then we didn't persist that replica
+ for (String bpid: bpidList) {
+ for (ReplicaInfo replicaInfo: oldReplicaMap.replicas(bpid)) {
+ if (replicaInfo.getState() != ReplicaState.TEMPORARY) {
+ Assert.fail("After datanode restart we lost the block with blockId: "
+ + replicaInfo.getBlockId());
+ }
+ }
+ }
+ }
+
+ private void createReplicas(List<String> bpList, List<FsVolumeImpl> volumes,
+ ReplicaMap volumeMap) throws IOException {
+ Assert.assertTrue("Volume map can't be null" , volumeMap != null);
+
+ // Here we create all different type of replicas and add it
+ // to volume map.
+ // Created all type of ReplicaInfo, each under Blkpool corresponding volume
+ long id = 1; // This variable is used as both blockId and genStamp
+ for (String bpId: bpList) {
+ for (FsVolumeImpl volume: volumes) {
+ ReplicaInfo finalizedReplica = new FinalizedReplica(id, 1, id, volume,
+ DatanodeUtil.idToBlockDir(volume.getFinalizedDir(bpId), id));
+ volumeMap.add(bpId, finalizedReplica);
+ id++;
+
+ ReplicaInfo rbwReplica = new ReplicaBeingWritten(id, 1, id, volume,
+ volume.getRbwDir(bpId), null, 100);
+ volumeMap.add(bpId, rbwReplica);
+ id++;
+
+ ReplicaInfo rwrReplica = new ReplicaWaitingToBeRecovered(id, 1, id,
+ volume, volume.getRbwDir(bpId));
+ volumeMap.add(bpId, rwrReplica);
+ id++;
+
+ ReplicaInfo ripReplica = new ReplicaInPipeline(id, id, volume,
+ volume.getTmpDir(bpId), 0);
+ volumeMap.add(bpId, ripReplica);
+ id++;
+ }
+ }
+
+ for (String bpId: bpList) {
+ for (ReplicaInfo replicaInfo: volumeMap.replicas(bpId)) {
+ File parentFile = replicaInfo.getBlockFile().getParentFile();
+ if (!parentFile.exists()) {
+ if (!parentFile.mkdirs()) {
+ throw new IOException("Failed to mkdirs " + parentFile);
+ }
+ }
+ replicaInfo.getBlockFile().createNewFile();
+ replicaInfo.getMetaFile().createNewFile();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
index 92ea111..5d319b4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestListCorruptFileBlocks.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Test;
import org.slf4j.Logger;
@@ -483,6 +485,10 @@ public class TestListCorruptFileBlocks {
}
}
+ // Run the direcrtoryScanner to update the Datanodes volumeMap
+ DataNode dn = cluster.getDataNodes().get(0);
+ DataNodeTestUtils.runDirectoryScanner(dn);
+
// Occasionally the BlockPoolSliceScanner can run before we have removed
// the blocks. Restart the Datanode to trigger the scanner into running
// once more.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
index 168ebb9..37abc5b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestProcessCorruptBlocks.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.NumberReplicas;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.junit.Test;
public class TestProcessCorruptBlocks {
@@ -269,6 +270,8 @@ public class TestProcessCorruptBlocks {
// But the datadirectory will not change
assertTrue(cluster.corruptReplica(dnIndex, block));
+ // Run directory scanner to update the DN's volume map
+ DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
DataNodeProperties dnProps = cluster.stopDataNode(0);
// Each datanode has multiple data dirs, check each
http://git-wip-us.apache.org/repos/asf/hadoop/blob/19bc19e8/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
index 4d4fed6..443500c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestPendingCorruptDnMessages.java
@@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.util.ThreadUtil;
import org.junit.Test;
@@ -69,6 +70,8 @@ public class TestPendingCorruptDnMessages {
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
assertTrue(cluster.changeGenStampOfBlock(0, block, 900));
+ // Run directory dsscanner to update Datanode's volumeMap
+ DataNodeTestUtils.runDirectoryScanner(cluster.getDataNodes().get(0));
// Stop the DN so the replica with the changed gen stamp will be reported
// when this DN starts up.
DataNodeProperties dnProps = cluster.stopDataNode(0);