You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2014/10/31 01:31:43 UTC
git commit: HDFS-7035. Make adding a new data directory to the
DataNode an atomic operation and improve error handling (Lei Xu via Colin P.
McCabe)
Repository: hadoop
Updated Branches:
refs/heads/trunk 348bfb7d5 -> a9331fe9b
HDFS-7035. Make adding a new data directory to the DataNode an atomic operation and improve error handling (Lei Xu via Colin P. McCabe)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/a9331fe9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/a9331fe9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/a9331fe9
Branch: refs/heads/trunk
Commit: a9331fe9b071fdcdae0c6c747d7b6b306142e671
Parents: 348bfb7
Author: Colin Patrick Mccabe <cm...@cloudera.com>
Authored: Thu Oct 30 17:31:23 2014 -0700
Committer: Colin Patrick Mccabe <cm...@cloudera.com>
Committed: Thu Oct 30 17:31:23 2014 -0700
----------------------------------------------------------------------
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +
.../hadoop/hdfs/server/common/Storage.java | 15 +
.../hadoop/hdfs/server/common/StorageInfo.java | 6 +-
.../server/datanode/BlockPoolSliceStorage.java | 168 +++++---
.../hadoop/hdfs/server/datanode/DataNode.java | 109 ++++--
.../hdfs/server/datanode/DataStorage.java | 382 ++++++++++---------
.../server/datanode/fsdataset/FsDatasetSpi.java | 6 +-
.../datanode/fsdataset/impl/FsDatasetImpl.java | 161 +++-----
.../server/datanode/SimulatedFSDataset.java | 7 +-
.../datanode/TestDataNodeHotSwapVolumes.java | 108 +++++-
.../hdfs/server/datanode/TestDataStorage.java | 26 +-
.../fsdataset/impl/TestFsDatasetImpl.java | 27 +-
12 files changed, 579 insertions(+), 439 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index 9f331d9..438ed66 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -322,6 +322,9 @@ Release 2.7.0 - UNRELEASED
HDFS-3342. SocketTimeoutException in BlockSender.sendChunks could
have a better error message. (Yongjun Zhang via wang)
+ HDFS-7035. Make adding a new data directory to the DataNode an atomic
+ operation and improve error handling (Lei Xu via Colin P. McCabe)
+
OPTIMIZATIONS
BUG FIXES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
index 913d890..f83cf3b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java
@@ -821,6 +821,21 @@ public abstract class Storage extends StorageInfo {
}
/**
+ * Returns true if the storage directory on the given directory is already
+ * loaded.
+ * @param root the root directory of a {@link StorageDirectory}
+ * @throws IOException if failed to get canonical path.
+ */
+ protected boolean containsStorageDir(File root) throws IOException {
+ for (StorageDirectory sd : storageDirs) {
+ if (sd.getRoot().getCanonicalPath().equals(root.getCanonicalPath())) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
* Return true if the layout of the given storage directory is from a version
* of Hadoop prior to the introduction of the "current" and "previous"
* directories which allow upgrade and rollback.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
index 545fb13..f40b079 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageInfo.java
@@ -216,7 +216,11 @@ public class StorageInfo {
}
namespaceID = nsId;
}
-
+
+ public void setServiceLayoutVersion(int lv) {
+ this.layoutVersion = lv;
+ }
+
public int getServiceLayoutVersion() {
return storageType == NodeType.DATA_NODE ? HdfsConstants.DATANODE_LAYOUT_VERSION
: HdfsConstants.NAMENODE_LAYOUT_VERSION;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index 8333bb4..8c819a7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.HardLink;
@@ -35,9 +36,7 @@ import org.apache.hadoop.util.Daemon;
import java.io.File;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Properties;
@@ -127,80 +126,123 @@ public class BlockPoolSliceStorage extends Storage {
new ConcurrentHashMap<String, Boolean>());
}
+ // Expose visibility for VolumeBuilder#commit().
+ public void addStorageDir(StorageDirectory sd) {
+ super.addStorageDir(sd);
+ }
+
/**
- * Analyze storage directories. Recover from previous transitions if required.
- *
+ * Load one storage directory. Recover from previous transitions if required.
+ *
+ * @param datanode datanode instance
+ * @param nsInfo namespace information
+ * @param dataDir the root path of the storage directory
+ * @param startOpt startup option
+ * @return the StorageDirectory successfully loaded.
+ * @throws IOException
+ */
+ private StorageDirectory loadStorageDirectory(DataNode datanode,
+ NamespaceInfo nsInfo, File dataDir, StartupOption startOpt) throws IOException {
+ StorageDirectory sd = new StorageDirectory(dataDir, null, true);
+ try {
+ StorageState curState = sd.analyzeStorage(startOpt, this);
+ // sd is locked but not opened
+ switch (curState) {
+ case NORMAL:
+ break;
+ case NON_EXISTENT:
+ LOG.info("Block pool storage directory " + dataDir + " does not exist");
+ throw new IOException("Storage directory " + dataDir
+ + " does not exist");
+ case NOT_FORMATTED: // format
+ LOG.info("Block pool storage directory " + dataDir
+ + " is not formatted for " + nsInfo.getBlockPoolID());
+ LOG.info("Formatting ...");
+ format(sd, nsInfo);
+ break;
+ default: // recovery part is common
+ sd.doRecover(curState);
+ }
+
+ // 2. Do transitions
+ // Each storage directory is treated individually.
+ // During startup some of them can upgrade or roll back
+ // while others could be up-to-date for the regular startup.
+ doTransition(datanode, sd, nsInfo, startOpt);
+ if (getCTime() != nsInfo.getCTime()) {
+ throw new IOException(
+ "Data-node and name-node CTimes must be the same.");
+ }
+
+ // 3. Update successfully loaded storage.
+ setServiceLayoutVersion(getServiceLayoutVersion());
+ writeProperties(sd);
+
+ return sd;
+ } catch (IOException ioe) {
+ sd.unlock();
+ throw ioe;
+ }
+ }
+
+ /**
+ * Analyze and load storage directories. Recover from previous transitions if
+ * required.
+ *
+ * The block pool storages are either all analyzed or none of them is loaded.
+ * Therefore, a failure on loading any block pool storage results a faulty
+ * data volume.
+ *
* @param datanode Datanode to which this storage belongs to
* @param nsInfo namespace information
* @param dataDirs storage directories of block pool
* @param startOpt startup option
+ * @return an array of loaded block pool directories.
* @throws IOException on error
*/
- void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
+ List<StorageDirectory> loadBpStorageDirectories(
+ DataNode datanode, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
- LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
- Set<String> existingStorageDirs = new HashSet<String>();
- for (int i = 0; i < getNumStorageDirs(); i++) {
- existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
- }
-
- // 1. For each BP data directory analyze the state and
- // check whether all is consistent before transitioning.
- ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
- dataDirs.size());
- for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
- File dataDir = it.next();
- if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
- LOG.info("Storage directory " + dataDir + " has already been used.");
- it.remove();
- continue;
- }
- StorageDirectory sd = new StorageDirectory(dataDir, null, true);
- StorageState curState;
- try {
- curState = sd.analyzeStorage(startOpt, this);
- // sd is locked but not opened
- switch (curState) {
- case NORMAL:
- break;
- case NON_EXISTENT:
- // ignore this storage
- LOG.info("Storage directory " + dataDir + " does not exist.");
- it.remove();
- continue;
- case NOT_FORMATTED: // format
- LOG.info("Storage directory " + dataDir + " is not formatted.");
- LOG.info("Formatting ...");
- format(sd, nsInfo);
- break;
- default: // recovery part is common
- sd.doRecover(curState);
+ List<StorageDirectory> succeedDirs = Lists.newArrayList();
+ try {
+ for (File dataDir : dataDirs) {
+ if (containsStorageDir(dataDir)) {
+ throw new IOException(
+ "BlockPoolSliceStorage.recoverTransitionRead: " +
+ "attempt to load an used block storage: " + dataDir);
}
- } catch (IOException ioe) {
- sd.unlock();
- throw ioe;
+ StorageDirectory sd =
+ loadStorageDirectory(datanode, nsInfo, dataDir, startOpt);
+ succeedDirs.add(sd);
}
- // add to the storage list. This is inherited from parent class, Storage.
- addStorageDir(sd);
- dataDirStates.add(curState);
+ } catch (IOException e) {
+ LOG.warn("Failed to analyze storage directories for block pool "
+ + nsInfo.getBlockPoolID(), e);
+ throw e;
}
+ return succeedDirs;
+ }
- if (dataDirs.size() == 0) // none of the data dirs exist
- throw new IOException(
- "All specified directories are not accessible or do not exist.");
-
- // 2. Do transitions
- // Each storage directory is treated individually.
- // During startup some of them can upgrade or roll back
- // while others could be up-to-date for the regular startup.
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
- assert getCTime() == nsInfo.getCTime()
- : "Data-node and name-node CTimes must be the same.";
+ /**
+ * Analyze storage directories. Recover from previous transitions if required.
+ *
+ * The block pool storages are either all analyzed or none of them is loaded.
+ * Therefore, a failure on loading any block pool storage results a faulty
+ * data volume.
+ *
+ * @param datanode Datanode to which this storage belongs to
+ * @param nsInfo namespace information
+ * @param dataDirs storage directories of block pool
+ * @param startOpt startup option
+ * @throws IOException on error
+ */
+ void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
+ Collection<File> dataDirs, StartupOption startOpt) throws IOException {
+ LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
+ for (StorageDirectory sd : loadBpStorageDirectories(
+ datanode, nsInfo, dataDirs, startOpt)) {
+ addStorageDir(sd);
}
-
- // 3. Update all storages. Some of them might have just been formatted.
- this.writeAll();
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
index eeda237..6bd27fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
@@ -44,12 +44,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PLUGINS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.IGNORE_SECURE_PORTS_FOR_TESTING_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.BufferedOutputStream;
@@ -81,6 +77,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
@@ -182,7 +182,6 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.tracing.TraceAdminPB;
import org.apache.hadoop.tracing.TraceAdminPB.TraceAdminService;
import org.apache.hadoop.tracing.TraceAdminProtocolPB;
import org.apache.hadoop.tracing.TraceAdminProtocolServerSideTranslatorPB;
@@ -451,20 +450,27 @@ public class DataNode extends ReconfigurableBase
*/
@VisibleForTesting
static class ChangedVolumes {
+ /** The storage locations of the newly added volumes. */
List<StorageLocation> newLocations = Lists.newArrayList();
+ /** The storage locations of the volumes that are removed. */
List<StorageLocation> deactivateLocations = Lists.newArrayList();
+ /** The unchanged locations that existed in the old configuration. */
+ List<StorageLocation> unchangedLocations = Lists.newArrayList();
}
/**
* Parse the new DFS_DATANODE_DATA_DIR value in the configuration to detect
* changed volumes.
+ * @param newVolumes a comma separated string that specifies the data volumes.
* @return changed volumes.
* @throws IOException if none of the directories are specified in the
* configuration.
*/
@VisibleForTesting
- ChangedVolumes parseChangedVolumes() throws IOException {
- List<StorageLocation> locations = getStorageLocations(getConf());
+ ChangedVolumes parseChangedVolumes(String newVolumes) throws IOException {
+ Configuration conf = new Configuration();
+ conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
+ List<StorageLocation> locations = getStorageLocations(conf);
if (locations.isEmpty()) {
throw new IOException("No directory is specified.");
@@ -479,9 +485,11 @@ public class DataNode extends ReconfigurableBase
boolean found = false;
for (Iterator<StorageLocation> sl = results.newLocations.iterator();
sl.hasNext(); ) {
- if (sl.next().getFile().getCanonicalPath().equals(
+ StorageLocation location = sl.next();
+ if (location.getFile().getCanonicalPath().equals(
dir.getRoot().getCanonicalPath())) {
sl.remove();
+ results.unchangedLocations.add(location);
found = true;
break;
}
@@ -499,18 +507,21 @@ public class DataNode extends ReconfigurableBase
/**
* Attempts to reload data volumes with new configuration.
* @param newVolumes a comma separated string that specifies the data volumes.
- * @throws Exception
+ * @throws IOException on error. If an IOException is thrown, some new volumes
+ * may have been successfully added and removed.
*/
- private synchronized void refreshVolumes(String newVolumes) throws Exception {
+ private synchronized void refreshVolumes(String newVolumes) throws IOException {
Configuration conf = getConf();
conf.set(DFS_DATANODE_DATA_DIR_KEY, newVolumes);
- List<StorageLocation> locations = getStorageLocations(conf);
-
- final int numOldDataDirs = dataDirs.size();
- dataDirs = locations;
- ChangedVolumes changedVolumes = parseChangedVolumes();
+ int numOldDataDirs = dataDirs.size();
+ ChangedVolumes changedVolumes = parseChangedVolumes(newVolumes);
StringBuilder errorMessageBuilder = new StringBuilder();
+ List<String> effectiveVolumes = Lists.newArrayList();
+ for (StorageLocation sl : changedVolumes.unchangedLocations) {
+ effectiveVolumes.add(sl.toString());
+ }
+
try {
if (numOldDataDirs + changedVolumes.newLocations.size() -
changedVolumes.deactivateLocations.size() <= 0) {
@@ -521,34 +532,43 @@ public class DataNode extends ReconfigurableBase
Joiner.on(",").join(changedVolumes.newLocations));
// Add volumes for each Namespace
+ final List<NamespaceInfo> nsInfos = Lists.newArrayList();
for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
- NamespaceInfo nsInfo = bpos.getNamespaceInfo();
- LOG.info("Loading volumes for namesapce: " + nsInfo.getNamespaceID());
- storage.addStorageLocations(
- this, nsInfo, changedVolumes.newLocations, StartupOption.HOTSWAP);
+ nsInfos.add(bpos.getNamespaceInfo());
}
- List<String> bpids = Lists.newArrayList();
- for (BPOfferService bpos : blockPoolManager.getAllNamenodeThreads()) {
- bpids.add(bpos.getBlockPoolId());
+ ExecutorService service = Executors.newFixedThreadPool(
+ changedVolumes.newLocations.size());
+ List<Future<IOException>> exceptions = Lists.newArrayList();
+ for (final StorageLocation location : changedVolumes.newLocations) {
+ exceptions.add(service.submit(new Callable<IOException>() {
+ @Override
+ public IOException call() {
+ try {
+ data.addVolume(location, nsInfos);
+ } catch (IOException e) {
+ return e;
+ }
+ return null;
+ }
+ }));
}
- List<StorageLocation> succeedVolumes =
- data.addVolumes(changedVolumes.newLocations, bpids);
-
- if (succeedVolumes.size() < changedVolumes.newLocations.size()) {
- List<StorageLocation> failedVolumes = Lists.newArrayList();
- // Clean all failed volumes.
- for (StorageLocation location : changedVolumes.newLocations) {
- if (!succeedVolumes.contains(location)) {
- errorMessageBuilder.append("FAILED TO ADD:");
- failedVolumes.add(location);
+
+ for (int i = 0; i < changedVolumes.newLocations.size(); i++) {
+ StorageLocation volume = changedVolumes.newLocations.get(i);
+ Future<IOException> ioExceptionFuture = exceptions.get(i);
+ try {
+ IOException ioe = ioExceptionFuture.get();
+ if (ioe != null) {
+ errorMessageBuilder.append(String.format("FAILED TO ADD: %s: %s\n",
+ volume.toString(), ioe.getMessage()));
} else {
- errorMessageBuilder.append("ADDED:");
+ effectiveVolumes.add(volume.toString());
}
- errorMessageBuilder.append(location);
- errorMessageBuilder.append("\n");
+ LOG.info("Storage directory is loaded: " + volume.toString());
+ } catch (Exception e) {
+ errorMessageBuilder.append(String.format("FAILED to ADD: %s: %s\n",
+ volume.toString(), e.getMessage()));
}
- storage.removeVolumes(failedVolumes);
- data.removeVolumes(failedVolumes);
}
}
@@ -557,15 +577,20 @@ public class DataNode extends ReconfigurableBase
Joiner.on(",").join(changedVolumes.deactivateLocations));
data.removeVolumes(changedVolumes.deactivateLocations);
- storage.removeVolumes(changedVolumes.deactivateLocations);
+ try {
+ storage.removeVolumes(changedVolumes.deactivateLocations);
+ } catch (IOException e) {
+ errorMessageBuilder.append(e.getMessage());
+ }
}
if (errorMessageBuilder.length() > 0) {
throw new IOException(errorMessageBuilder.toString());
}
- } catch (IOException e) {
- LOG.warn("There is IOException when refresh volumes! ", e);
- throw e;
+ } finally {
+ conf.set(DFS_DATANODE_DATA_DIR_KEY,
+ Joiner.on(",").join(effectiveVolumes));
+ dataDirs = getStorageLocations(conf);
}
}
@@ -1304,7 +1329,7 @@ public class DataNode extends ReconfigurableBase
final String bpid = nsInfo.getBlockPoolID();
//read storage info, lock data dirs and transition fs state if necessary
synchronized (this) {
- storage.recoverTransitionRead(this, bpid, nsInfo, dataDirs, startOpt);
+ storage.recoverTransitionRead(this, nsInfo, dataDirs, startOpt);
}
final StorageInfo bpStorage = storage.getBPStorage(bpid);
LOG.info("Setting up storage: nsid=" + bpStorage.getNamespaceID()
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 99eedb1..c90ef95 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -18,9 +18,12 @@
package org.apache.hadoop.hdfs.server.datanode;
+import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
import com.google.common.util.concurrent.Futures;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
@@ -188,176 +191,215 @@ public class DataStorage extends Storage {
}
return null;
}
-
+
/**
- * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+ * VolumeBuilder holds the metadata (e.g., the storage directories) of the
+ * prepared volume returned from {@link prepareVolume()}. Calling {@link build()}
+ * to add the metadata to {@link DataStorage} so that this prepared volume can
+ * be active.
*/
- private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
- this.layoutVersion = getServiceLayoutVersion();
- for (StorageDirectory dir : dirs) {
- writeProperties(dir);
+ @InterfaceAudience.Private
+ @InterfaceStability.Unstable
+ static public class VolumeBuilder {
+ private DataStorage storage;
+ /** Volume level storage directory. */
+ private StorageDirectory sd;
+ /** Mapping from block pool ID to an array of storage directories. */
+ private Map<String, List<StorageDirectory>> bpStorageDirMap =
+ Maps.newHashMap();
+
+ @VisibleForTesting
+ public VolumeBuilder(DataStorage storage, StorageDirectory sd) {
+ this.storage = storage;
+ this.sd = sd;
+ }
+
+ public final StorageDirectory getStorageDirectory() {
+ return this.sd;
+ }
+
+ private void addBpStorageDirectories(String bpid,
+ List<StorageDirectory> dirs) {
+ bpStorageDirMap.put(bpid, dirs);
+ }
+
+ /**
+ * Add loaded metadata of a data volume to {@link DataStorage}.
+ */
+ public void build() {
+ assert this.sd != null;
+ synchronized (storage) {
+ for (Map.Entry<String, List<StorageDirectory>> e :
+ bpStorageDirMap.entrySet()) {
+ final String bpid = e.getKey();
+ BlockPoolSliceStorage bpStorage = this.storage.bpStorageMap.get(bpid);
+ assert bpStorage != null;
+ for (StorageDirectory bpSd : e.getValue()) {
+ bpStorage.addStorageDir(bpSd);
+ }
+ }
+ storage.addStorageDir(sd);
+ }
+ }
+ }
+
+ private StorageDirectory loadStorageDirectory(DataNode datanode,
+ NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
+ throws IOException {
+ StorageDirectory sd = new StorageDirectory(dataDir, null, false);
+ try {
+ StorageState curState = sd.analyzeStorage(startOpt, this);
+ // sd is locked but not opened
+ switch (curState) {
+ case NORMAL:
+ break;
+ case NON_EXISTENT:
+ LOG.info("Storage directory " + dataDir + " does not exist");
+ throw new IOException("Storage directory " + dataDir
+ + " does not exist");
+ case NOT_FORMATTED: // format
+ LOG.info("Storage directory " + dataDir + " is not formatted for "
+ + nsInfo.getBlockPoolID());
+ LOG.info("Formatting ...");
+ format(sd, nsInfo, datanode.getDatanodeUuid());
+ break;
+ default: // recovery part is common
+ sd.doRecover(curState);
+ }
+
+ // 2. Do transitions
+ // Each storage directory is treated individually.
+ // During startup some of them can upgrade or roll back
+ // while others could be up-to-date for the regular startup.
+ doTransition(datanode, sd, nsInfo, startOpt);
+
+ // 3. Update successfully loaded storage.
+ setServiceLayoutVersion(getServiceLayoutVersion());
+ writeProperties(sd);
+
+ return sd;
+ } catch (IOException ioe) {
+ sd.unlock();
+ throw ioe;
}
}
/**
- * Add a list of volumes to be managed by DataStorage. If the volume is empty,
- * format it, otherwise recover it from previous transitions if required.
+ * Prepare a storage directory. It creates a builder which can be used to add
+ * to the volume. If the volume cannot be added, it is OK to discard the
+ * builder later.
*
- * @param datanode the reference to DataNode.
- * @param nsInfo namespace information
- * @param dataDirs array of data storage directories
- * @param startOpt startup option
- * @throws IOException
+ * @param datanode DataNode object.
+ * @param volume the root path of a storage directory.
+ * @param nsInfos an array of namespace infos.
+ * @return a VolumeBuilder that holds the metadata of this storage directory
+ * and can be added to DataStorage later.
+ * @throws IOException if encounters I/O errors.
+ *
+ * Note that if there is IOException, the state of DataStorage is not modified.
*/
- synchronized void addStorageLocations(DataNode datanode,
- NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
- StartupOption startOpt)
- throws IOException {
- // Similar to recoverTransitionRead, it first ensures the datanode level
- // format is completed.
- List<StorageLocation> tmpDataDirs =
- new ArrayList<StorageLocation>(dataDirs);
- addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
-
- Collection<File> bpDataDirs = new ArrayList<File>();
- String bpid = nsInfo.getBlockPoolID();
- for (StorageLocation dir : dataDirs) {
- File dnRoot = dir.getFile();
- File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
- STORAGE_DIR_CURRENT));
- bpDataDirs.add(bpRoot);
- }
- // mkdir for the list of BlockPoolStorage
- makeBlockPoolDataDir(bpDataDirs, null);
- BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
- if (bpStorage == null) {
- bpStorage = new BlockPoolSliceStorage(
- nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
- nsInfo.getClusterID());
+ public VolumeBuilder prepareVolume(DataNode datanode, File volume,
+ List<NamespaceInfo> nsInfos) throws IOException {
+ if (containsStorageDir(volume)) {
+ final String errorMessage = "Storage directory is in use";
+ LOG.warn(errorMessage + ".");
+ throw new IOException(errorMessage);
}
- bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
- addBlockPoolStorage(bpid, bpStorage);
+ StorageDirectory sd = loadStorageDirectory(
+ datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP);
+ VolumeBuilder builder =
+ new VolumeBuilder(this, sd);
+ for (NamespaceInfo nsInfo : nsInfos) {
+ List<File> bpDataDirs = Lists.newArrayList();
+ bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(
+ nsInfo.getBlockPoolID(), new File(volume, STORAGE_DIR_CURRENT)));
+ makeBlockPoolDataDir(bpDataDirs, null);
+
+ BlockPoolSliceStorage bpStorage;
+ final String bpid = nsInfo.getBlockPoolID();
+ synchronized (this) {
+ bpStorage = this.bpStorageMap.get(bpid);
+ if (bpStorage == null) {
+ bpStorage = new BlockPoolSliceStorage(
+ nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+ nsInfo.getClusterID());
+ addBlockPoolStorage(bpid, bpStorage);
+ }
+ }
+ builder.addBpStorageDirectories(
+ bpid, bpStorage.loadBpStorageDirectories(
+ datanode, nsInfo, bpDataDirs, StartupOption.HOTSWAP));
+ }
+ return builder;
}
/**
- * Add a list of volumes to be managed by this DataStorage. If the volume is
- * empty, it formats the volume, otherwise it recovers it from previous
- * transitions if required.
- *
- * If isInitialize is false, only the directories that have finished the
- * doTransition() process will be added into DataStorage.
+ * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+ * format it, otherwise recover it from previous transitions if required.
*
* @param datanode the reference to DataNode.
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
- * @param isInitialize whether it is called when DataNode starts up.
+ * @return a list of successfully loaded volumes.
* @throws IOException
*/
- private synchronized void addStorageLocations(DataNode datanode,
+ @VisibleForTesting
+ synchronized List<StorageLocation> addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
- StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
- throws IOException {
- Set<String> existingStorageDirs = new HashSet<String>();
- for (int i = 0; i < getNumStorageDirs(); i++) {
- existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
- }
-
- // 1. For each data directory calculate its state and check whether all is
- // consistent before transitioning. Format and recover.
- ArrayList<StorageState> dataDirStates =
- new ArrayList<StorageState>(dataDirs.size());
- List<StorageDirectory> addedStorageDirectories =
- new ArrayList<StorageDirectory>();
- for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
- File dataDir = it.next().getFile();
- if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
- LOG.info("Storage directory " + dataDir + " has already been used.");
- it.remove();
- continue;
- }
- StorageDirectory sd = new StorageDirectory(dataDir);
- StorageState curState;
- try {
- curState = sd.analyzeStorage(startOpt, this);
- // sd is locked but not opened
- switch (curState) {
- case NORMAL:
- break;
- case NON_EXISTENT:
- // ignore this storage
- LOG.info("Storage directory " + dataDir + " does not exist");
- it.remove();
+ StartupOption startOpt) throws IOException {
+ final String bpid = nsInfo.getBlockPoolID();
+ List<StorageLocation> successVolumes = Lists.newArrayList();
+ for (StorageLocation dataDir : dataDirs) {
+ File root = dataDir.getFile();
+ if (!containsStorageDir(root)) {
+ try {
+ // It first ensures the datanode level format is completed.
+ StorageDirectory sd = loadStorageDirectory(
+ datanode, nsInfo, root, startOpt);
+ addStorageDir(sd);
+ } catch (IOException e) {
+ LOG.warn(e);
continue;
- case NOT_FORMATTED: // format
- LOG.info("Storage directory " + dataDir + " is not formatted for "
- + nsInfo.getBlockPoolID());
- LOG.info("Formatting ...");
- format(sd, nsInfo, datanode.getDatanodeUuid());
- break;
- default: // recovery part is common
- sd.doRecover(curState);
}
- } catch (IOException ioe) {
- sd.unlock();
- LOG.warn("Ignoring storage directory " + dataDir
- + " due to an exception", ioe);
- //continue with other good dirs
- continue;
- }
- if (isInitialize) {
- addStorageDir(sd);
+ } else {
+ LOG.info("Storage directory " + dataDir + " has already been used.");
}
- addedStorageDirectories.add(sd);
- dataDirStates.add(curState);
- }
- if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
- // none of the data dirs exist
- if (ignoreExistingDirs) {
- return;
- }
- throw new IOException(
- "All specified directories are not accessible or do not exist.");
- }
-
- // 2. Do transitions
- // Each storage directory is treated individually.
- // During startup some of them can upgrade or rollback
- // while others could be up-to-date for the regular startup.
- for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
- it.hasNext(); ) {
- StorageDirectory sd = it.next();
+ List<File> bpDataDirs = new ArrayList<File>();
+ bpDataDirs.add(BlockPoolSliceStorage.getBpRoot(bpid, new File(root,
+ STORAGE_DIR_CURRENT)));
try {
- doTransition(datanode, sd, nsInfo, startOpt);
- createStorageID(sd);
- } catch (IOException e) {
- if (!isInitialize) {
- sd.unlock();
- it.remove();
- continue;
+ makeBlockPoolDataDir(bpDataDirs, null);
+ BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+ if (bpStorage == null) {
+ bpStorage = new BlockPoolSliceStorage(
+ nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+ nsInfo.getClusterID());
}
- unlockAll();
- throw e;
- }
- }
-
- // 3. Update all successfully loaded storages. Some of them might have just
- // been formatted.
- this.writeAll(addedStorageDirectories);
- // 4. Make newly loaded storage directories visible for service.
- if (!isInitialize) {
- this.storageDirs.addAll(addedStorageDirectories);
+ bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+ addBlockPoolStorage(bpid, bpStorage);
+ } catch (IOException e) {
+ LOG.warn("Failed to add storage for block pool: " + bpid + " : "
+ + e.getMessage());
+ continue;
+ }
+ successVolumes.add(dataDir);
}
+ return successVolumes;
}
/**
- * Remove volumes from DataStorage.
+ * Remove volumes from DataStorage. All volumes are removed even when the
+ * IOException is thrown.
+ *
* @param locations a collection of volumes.
+ * @throws IOException if I/O error when unlocking storage directory.
*/
- synchronized void removeVolumes(Collection<StorageLocation> locations) {
+ synchronized void removeVolumes(Collection<StorageLocation> locations)
+ throws IOException {
if (locations.isEmpty()) {
return;
}
@@ -371,6 +413,7 @@ public class DataStorage extends Storage {
bpsStorage.removeVolumes(dataDirs);
}
+ StringBuilder errorMsgBuilder = new StringBuilder();
for (Iterator<StorageDirectory> it = this.storageDirs.iterator();
it.hasNext(); ) {
StorageDirectory sd = it.next();
@@ -382,13 +425,18 @@ public class DataStorage extends Storage {
LOG.warn(String.format(
"I/O error attempting to unlock storage directory %s.",
sd.getRoot()), e);
+ errorMsgBuilder.append(String.format("Failed to remove %s: %s\n",
+ sd.getRoot(), e.getMessage()));
}
}
}
+ if (errorMsgBuilder.length() > 0) {
+ throw new IOException(errorMsgBuilder.toString());
+ }
}
/**
- * Analyze storage directories.
+ * Analyze storage directories for a specific block pool.
* Recover from previous transitions if required.
* Perform fs state transition if necessary depending on the namespace info.
* Read storage info.
@@ -396,60 +444,25 @@ public class DataStorage extends Storage {
* This method should be synchronized between multiple DN threads. Only the
* first DN thread does DN level storage dir recoverTransitionRead.
*
- * @param nsInfo namespace information
- * @param dataDirs array of data storage directories
- * @param startOpt startup option
- * @throws IOException
- */
- synchronized void recoverTransitionRead(DataNode datanode,
- NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
- StartupOption startOpt)
- throws IOException {
- if (initialized) {
- // DN storage has been initialized, no need to do anything
- return;
- }
- LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
- + " and NameNode layout version: " + nsInfo.getLayoutVersion());
-
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
- addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
-
- // mark DN storage is initialized
- this.initialized = true;
- }
-
- /**
- * recoverTransitionRead for a specific block pool
- *
* @param datanode DataNode
- * @param bpID Block pool Id
* @param nsInfo Namespace info of namenode corresponding to the block pool
* @param dataDirs Storage directories
* @param startOpt startup option
* @throws IOException on error
*/
- void recoverTransitionRead(DataNode datanode, String bpID, NamespaceInfo nsInfo,
+ void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection<StorageLocation> dataDirs, StartupOption startOpt) throws IOException {
- // First ensure datanode level format/snapshot/rollback is completed
- recoverTransitionRead(datanode, nsInfo, dataDirs, startOpt);
-
- // Create list of storage directories for the block pool
- Collection<File> bpDataDirs = new ArrayList<File>();
- for(StorageLocation dir : dataDirs) {
- File dnRoot = dir.getFile();
- File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
- STORAGE_DIR_CURRENT));
- bpDataDirs.add(bpRoot);
+ if (this.initialized) {
+ LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+ this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+ // mark DN storage is initialized
+ this.initialized = true;
}
- // mkdir for the list of BlockPoolStorage
- makeBlockPoolDataDir(bpDataDirs, null);
- BlockPoolSliceStorage bpStorage = new BlockPoolSliceStorage(
- nsInfo.getNamespaceID(), bpID, nsInfo.getCTime(), nsInfo.getClusterID());
-
- bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
- addBlockPoolStorage(bpID, bpStorage);
+ if (addStorageLocations(datanode, nsInfo, dataDirs, startOpt).isEmpty()) {
+ throw new IOException("All specified directories are failed to load.");
+ }
}
/**
@@ -665,12 +678,15 @@ public class DataStorage extends Storage {
// meaningful at BlockPoolSliceStorage level.
// regular start up.
- if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION)
+ if (this.layoutVersion == HdfsConstants.DATANODE_LAYOUT_VERSION) {
+ createStorageID(sd);
return; // regular startup
+ }
// do upgrade
if (this.layoutVersion > HdfsConstants.DATANODE_LAYOUT_VERSION) {
doUpgrade(datanode, sd, nsInfo); // upgrade
+ createStorageID(sd);
return;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
index 849c80e..a02ee0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
@@ -100,8 +101,9 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
public List<V> getVolumes();
/** Add an array of StorageLocation to FsDataset. */
- public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
- final Collection<String> bpids);
+ public void addVolume(
+ final StorageLocation location,
+ final List<NamespaceInfo> nsInfos) throws IOException;
/** Removes a collection of volumes from FsDataset. */
public void removeVolumes(Collection<StorageLocation> volumes);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index 73966b7..4a89778 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -31,7 +31,6 @@ import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
@@ -94,6 +93,7 @@ import static org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.RamDiskRepli
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
@@ -307,30 +307,39 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
ReplicaMap tempVolumeMap = new ReplicaMap(this);
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
- volumeMap.addAll(tempVolumeMap);
- volumes.addVolume(fsVolume);
- storageMap.put(sd.getStorageUuid(),
- new DatanodeStorage(sd.getStorageUuid(),
- DatanodeStorage.State.NORMAL,
- storageType));
- asyncDiskService.addVolume(sd.getCurrentDir());
+ synchronized (this) {
+ volumeMap.addAll(tempVolumeMap);
+ storageMap.put(sd.getStorageUuid(),
+ new DatanodeStorage(sd.getStorageUuid(),
+ DatanodeStorage.State.NORMAL,
+ storageType));
+ asyncDiskService.addVolume(sd.getCurrentDir());
+ volumes.addVolume(fsVolume);
+ }
LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
- private void addVolumeAndBlockPool(Collection<StorageLocation> dataLocations,
- Storage.StorageDirectory sd, final Collection<String> bpids)
+ @Override
+ public void addVolume(final StorageLocation location,
+ final List<NamespaceInfo> nsInfos)
throws IOException {
- final File dir = sd.getCurrentDir();
- final StorageType storageType =
- getStorageTypeFromLocations(dataLocations, sd.getRoot());
+ final File dir = location.getFile();
+ // Prepare volume in DataStorage
+ DataStorage.VolumeBuilder builder =
+ dataStorage.prepareVolume(datanode, location.getFile(), nsInfos);
+
+ final Storage.StorageDirectory sd = builder.getStorageDirectory();
+
+ StorageType storageType = location.getStorageType();
final FsVolumeImpl fsVolume = new FsVolumeImpl(
this, sd.getStorageUuid(), dir, this.conf, storageType);
final ReplicaMap tempVolumeMap = new ReplicaMap(fsVolume);
+ ArrayList<IOException> exceptions = Lists.newArrayList();
- List<IOException> exceptions = Lists.newArrayList();
- for (final String bpid : bpids) {
+ for (final NamespaceInfo nsInfo : nsInfos) {
+ String bpid = nsInfo.getBlockPoolID();
try {
fsVolume.addBlockPool(bpid, this.conf);
fsVolume.getVolumeMap(bpid, tempVolumeMap, ramDiskReplicaTracker);
@@ -341,89 +350,22 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
}
if (!exceptions.isEmpty()) {
- // The states of FsDatasteImpl are not modified, thus no need to rolled back.
throw MultipleIOException.createIOException(exceptions);
}
- volumeMap.addAll(tempVolumeMap);
- storageMap.put(sd.getStorageUuid(),
- new DatanodeStorage(sd.getStorageUuid(),
- DatanodeStorage.State.NORMAL,
- storageType));
- asyncDiskService.addVolume(sd.getCurrentDir());
- volumes.addVolume(fsVolume);
-
- LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
- }
-
- /**
- * Add an array of StorageLocation to FsDataset.
- *
- * @pre dataStorage must have these volumes.
- * @param volumes an array of storage locations for adding volumes.
- * @param bpids block pool IDs.
- * @return an array of successfully loaded volumes.
- */
- @Override
- public synchronized List<StorageLocation> addVolumes(
- final List<StorageLocation> volumes, final Collection<String> bpids) {
- final Collection<StorageLocation> dataLocations =
- DataNode.getStorageLocations(this.conf);
- final Map<String, Storage.StorageDirectory> allStorageDirs =
- new HashMap<String, Storage.StorageDirectory>();
- List<StorageLocation> succeedVolumes = Lists.newArrayList();
- try {
- for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
- Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
- allStorageDirs.put(sd.getRoot().getCanonicalPath(), sd);
- }
- } catch (IOException ioe) {
- LOG.warn("Caught exception when parsing storage URL.", ioe);
- return succeedVolumes;
- }
-
- final boolean[] successFlags = new boolean[volumes.size()];
- Arrays.fill(successFlags, false);
- List<Thread> volumeAddingThreads = Lists.newArrayList();
- for (int i = 0; i < volumes.size(); i++) {
- final int idx = i;
- Thread t = new Thread() {
- public void run() {
- StorageLocation vol = volumes.get(idx);
- try {
- String key = vol.getFile().getCanonicalPath();
- if (!allStorageDirs.containsKey(key)) {
- LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
- } else {
- addVolumeAndBlockPool(dataLocations, allStorageDirs.get(key),
- bpids);
- successFlags[idx] = true;
- }
- } catch (IOException e) {
- LOG.warn("Caught exception when adding volume " + vol, e);
- }
- }
- };
- volumeAddingThreads.add(t);
- t.start();
- }
-
- for (Thread t : volumeAddingThreads) {
- try {
- t.join();
- } catch (InterruptedException e) {
- LOG.warn("Caught InterruptedException when adding volume.", e);
- }
- }
+ setupAsyncLazyPersistThread(fsVolume);
- setupAsyncLazyPersistThreads();
-
- for (int i = 0; i < volumes.size(); i++) {
- if (successFlags[i]) {
- succeedVolumes.add(volumes.get(i));
- }
+ builder.build();
+ synchronized (this) {
+ volumeMap.addAll(tempVolumeMap);
+ storageMap.put(sd.getStorageUuid(),
+ new DatanodeStorage(sd.getStorageUuid(),
+ DatanodeStorage.State.NORMAL,
+ storageType));
+ asyncDiskService.addVolume(sd.getCurrentDir());
+ volumes.addVolume(fsVolume);
}
- return succeedVolumes;
+ LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
}
/**
@@ -2476,24 +2418,27 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// added or removed.
// This should only be called when the FsDataSetImpl#volumes list is finalized.
private void setupAsyncLazyPersistThreads() {
- boolean ramDiskConfigured = ramDiskConfigured();
for (FsVolumeImpl v: getVolumes()){
- // Skip transient volumes
- if (v.isTransientStorage()) {
- continue;
- }
+ setupAsyncLazyPersistThread(v);
+ }
+ }
- // Add thread for DISK volume if RamDisk is configured
- if (ramDiskConfigured &&
- !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
- asyncLazyPersistService.addVolume(v.getCurrentDir());
- }
+ private void setupAsyncLazyPersistThread(final FsVolumeImpl v) {
+ // Skip transient volumes
+ if (v.isTransientStorage()) {
+ return;
+ }
+ boolean ramDiskConfigured = ramDiskConfigured();
+ // Add thread for DISK volume if RamDisk is configured
+ if (ramDiskConfigured &&
+ !asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+ asyncLazyPersistService.addVolume(v.getCurrentDir());
+ }
- // Remove thread for DISK volume if RamDisk is not configured
- if (!ramDiskConfigured &&
- asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
- asyncLazyPersistService.removeVolume(v.getCurrentDir());
- }
+ // Remove thread for DISK volume if RamDisk is not configured
+ if (!ramDiskConfigured &&
+ asyncLazyPersistService.queryVolume(v.getCurrentDir())) {
+ asyncLazyPersistService.removeVolume(v.getCurrentDir());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
index 46cb46a..3e5034a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.LengthInputStream;
@@ -53,6 +54,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsVolumeImpl;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils;
@@ -1194,8 +1196,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override
- public List<StorageLocation> addVolumes(List<StorageLocation> volumes,
- final Collection<String> bpids) {
+ public void addVolume(
+ final StorageLocation location,
+ final List<NamespaceInfo> nsInfos) throws IOException {
throw new UnsupportedOperationException();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
index 27cfc82..d468493 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeHotSwapVolumes.java
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.datanode;
+import com.google.common.base.Joiner;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.ReconfigurationException;
import org.apache.hadoop.fs.BlockLocation;
@@ -33,6 +35,8 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
@@ -58,9 +62,14 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
+import static org.hamcrest.CoreMatchers.anyOf;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.core.Is.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -197,10 +206,11 @@ public class TestDataNodeHotSwapVolumes {
}
assertFalse(oldLocations.isEmpty());
- String newPaths = "/foo/path1,/foo/path2";
- conf.set(DFS_DATANODE_DATA_DIR_KEY, newPaths);
+ String newPaths = oldLocations.get(0).getFile().getAbsolutePath() +
+ ",/foo/path1,/foo/path2";
- DataNode.ChangedVolumes changedVolumes =dn.parseChangedVolumes();
+ DataNode.ChangedVolumes changedVolumes =
+ dn.parseChangedVolumes(newPaths);
List<StorageLocation> newVolumes = changedVolumes.newLocations;
assertEquals(2, newVolumes.size());
assertEquals(new File("/foo/path1").getAbsolutePath(),
@@ -209,21 +219,21 @@ public class TestDataNodeHotSwapVolumes {
newVolumes.get(1).getFile().getAbsolutePath());
List<StorageLocation> removedVolumes = changedVolumes.deactivateLocations;
- assertEquals(oldLocations.size(), removedVolumes.size());
- for (int i = 0; i < removedVolumes.size(); i++) {
- assertEquals(oldLocations.get(i).getFile(),
- removedVolumes.get(i).getFile());
- }
+ assertEquals(1, removedVolumes.size());
+ assertEquals(oldLocations.get(1).getFile(),
+ removedVolumes.get(0).getFile());
+
+ assertEquals(1, changedVolumes.unchangedLocations.size());
+ assertEquals(oldLocations.get(0).getFile(),
+ changedVolumes.unchangedLocations.get(0).getFile());
}
@Test
public void testParseChangedVolumesFailures() throws IOException {
startDFSCluster(1, 1);
DataNode dn = cluster.getDataNodes().get(0);
- Configuration conf = dn.getConf();
try {
- conf.set(DFS_DATANODE_DATA_DIR_KEY, "");
- dn.parseChangedVolumes();
+ dn.parseChangedVolumes("");
fail("Should throw IOException: empty inputs.");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains("No directory is specified.", e);
@@ -231,7 +241,8 @@ public class TestDataNodeHotSwapVolumes {
}
/** Add volumes to the first DataNode. */
- private void addVolumes(int numNewVolumes) throws ReconfigurationException {
+ private void addVolumes(int numNewVolumes)
+ throws ReconfigurationException, IOException {
File dataDir = new File(cluster.getDataDirectory());
DataNode dn = cluster.getDataNodes().get(0); // First DataNode.
Configuration conf = dn.getConf();
@@ -253,12 +264,26 @@ public class TestDataNodeHotSwapVolumes {
newVolumeDirs.add(volumeDir);
volumeDir.mkdirs();
newDataDirBuf.append(",");
- newDataDirBuf.append(volumeDir.toURI());
+ newDataDirBuf.append(
+ StorageLocation.parse(volumeDir.toString()).toString());
}
String newDataDir = newDataDirBuf.toString();
dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newDataDir);
- assertEquals(newDataDir, conf.get(DFS_DATANODE_DATA_DIR_KEY));
+
+ // Verify the configuration value is appropriately set.
+ String[] effectiveDataDirs = conf.get(DFS_DATANODE_DATA_DIR_KEY).split(",");
+ String[] expectDataDirs = newDataDir.split(",");
+ assertEquals(expectDataDirs.length, effectiveDataDirs.length);
+ for (int i = 0; i < expectDataDirs.length; i++) {
+ StorageLocation expectLocation = StorageLocation.parse(expectDataDirs[i]);
+ StorageLocation effectiveLocation =
+ StorageLocation.parse(effectiveDataDirs[i]);
+ assertEquals(expectLocation.getStorageType(),
+ effectiveLocation.getStorageType());
+ assertEquals(expectLocation.getFile().getCanonicalFile(),
+ effectiveLocation.getFile().getCanonicalFile());
+ }
// Check that all newly created volumes are appropriately formatted.
for (File volumeDir : newVolumeDirs) {
@@ -439,7 +464,7 @@ public class TestDataNodeHotSwapVolumes {
dn.reconfigurePropertyImpl(
DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, newDirs);
assertFileLocksReleased(
- new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
+ new ArrayList<String>(oldDirs).subList(1, oldDirs.size()));
triggerDeleteReport(dn);
@@ -447,6 +472,59 @@ public class TestDataNodeHotSwapVolumes {
DFSTestUtil.waitReplication(fs, testFile, replFactor);
}
+ @Test
+ public void testAddVolumeFailures() throws IOException {
+ startDFSCluster(1, 1);
+ final String dataDir = cluster.getDataDirectory();
+
+ DataNode dn = cluster.getDataNodes().get(0);
+ List<String> newDirs = Lists.newArrayList();
+ final int NUM_NEW_DIRS = 4;
+ for (int i = 0; i < NUM_NEW_DIRS; i++) {
+ File newVolume = new File(dataDir, "new_vol" + i);
+ newDirs.add(newVolume.toString());
+ if (i % 2 == 0) {
+ // Make addVolume() fail.
+ newVolume.createNewFile();
+ }
+ }
+
+ String newValue = dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY) + "," +
+ Joiner.on(",").join(newDirs);
+ try {
+ dn.reconfigurePropertyImpl(DFS_DATANODE_DATA_DIR_KEY, newValue);
+ fail("Expect to throw IOException.");
+ } catch (ReconfigurationException e) {
+ String errorMessage = e.getCause().getMessage();
+ String messages[] = errorMessage.split("\\r?\\n");
+ assertEquals(2, messages.length);
+ assertThat(messages[0], containsString("new_vol0"));
+ assertThat(messages[1], containsString("new_vol2"));
+ }
+
+ // Make sure that vol0 and vol2's metadata are not left in memory.
+ FsDatasetSpi<?> dataset = dn.getFSDataset();
+ for (FsVolumeSpi volume : dataset.getVolumes()) {
+ assertThat(volume.getBasePath(), is(not(anyOf(
+ is(newDirs.get(0)), is(newDirs.get(2))))));
+ }
+ DataStorage storage = dn.getStorage();
+ for (int i = 0; i < storage.getNumStorageDirs(); i++) {
+ Storage.StorageDirectory sd = storage.getStorageDir(i);
+ assertThat(sd.getRoot().toString(),
+ is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
+ }
+
+ // The newly effective conf does not have vol0 and vol2.
+ String[] effectiveVolumes =
+ dn.getConf().get(DFS_DATANODE_DATA_DIR_KEY).split(",");
+ assertEquals(4, effectiveVolumes.length);
+ for (String ev : effectiveVolumes) {
+ assertThat(new File(ev).getCanonicalPath(),
+ is(not(anyOf(is(newDirs.get(0)), is(newDirs.get(2))))));
+ }
+ }
+
/**
* Asserts that the storage lock file in each given directory has been
* released. This method works by trying to acquire the lock file itself. If
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
index ed32243..c90b8e5 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
@@ -146,14 +146,11 @@ public class TestDataStorage {
assertEquals(numLocations, storage.getNumStorageDirs());
locations = createStorageLocations(numLocations);
- try {
- storage.addStorageLocations(mockDN, namespaceInfos.get(0),
- locations, START_OPT);
- fail("Expected to throw IOException: adding active directories.");
- } catch (IOException e) {
- GenericTestUtils.assertExceptionContains(
- "All specified directories are not accessible or do not exist.", e);
- }
+ List<StorageLocation> addedLocation =
+ storage.addStorageLocations(mockDN, namespaceInfos.get(0),
+ locations, START_OPT);
+ assertTrue(addedLocation.isEmpty());
+
// The number of active storage dirs has not changed, since it tries to
// add the storage dirs that are under service.
assertEquals(numLocations, storage.getNumStorageDirs());
@@ -169,13 +166,12 @@ public class TestDataStorage {
final int numLocations = 3;
List<StorageLocation> locations =
createStorageLocations(numLocations, true);
-
try {
storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
fail("An IOException should throw: all StorageLocations are NON_EXISTENT");
} catch (IOException e) {
GenericTestUtils.assertExceptionContains(
- "All specified directories are not accessible or do not exist.", e);
+ "All specified directories are failed to load.", e);
}
assertEquals(0, storage.getNumStorageDirs());
}
@@ -191,9 +187,9 @@ public class TestDataStorage {
throws IOException {
final int numLocations = 3;
List<StorageLocation> locations = createStorageLocations(numLocations);
- String bpid = nsInfo.getBlockPoolID();
// Prepare volumes
- storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+ storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
+ assertEquals(numLocations, storage.getNumStorageDirs());
// Reset DataStorage
storage.unlockAll();
@@ -201,11 +197,11 @@ public class TestDataStorage {
// Trigger an exception from doTransition().
nsInfo.clusterID = "cluster1";
try {
- storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+ storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
fail("Expect to throw an exception from doTransition()");
} catch (IOException e) {
- GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e);
+ GenericTestUtils.assertExceptionContains("All specified directories", e);
}
- assertEquals(numLocations, storage.getNumStorageDirs());
+ assertEquals(0, storage.getNumStorageDirs());
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/a9331fe9/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
index 7c39ca5..956ab78 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+import com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@@ -31,6 +32,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.junit.Before;
@@ -40,7 +42,6 @@ import org.mockito.Mockito;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -48,7 +49,9 @@ import java.util.Set;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -64,6 +67,7 @@ public class TestFsDatasetImpl {
new StorageInfo(HdfsServerConstants.NodeType.DATA_NODE));
private Configuration conf;
+ private DataNode datanode;
private DataStorage storage;
private DataBlockScanner scanner;
private FsDatasetImpl dataset;
@@ -94,7 +98,7 @@ public class TestFsDatasetImpl {
@Before
public void setUp() throws IOException {
- final DataNode datanode = Mockito.mock(DataNode.class);
+ datanode = Mockito.mock(DataNode.class);
storage = Mockito.mock(DataStorage.class);
scanner = Mockito.mock(DataBlockScanner.class);
this.conf = new Configuration();
@@ -119,17 +123,24 @@ public class TestFsDatasetImpl {
final int numNewVolumes = 3;
final int numExistingVolumes = dataset.getVolumes().size();
final int totalVolumes = numNewVolumes + numExistingVolumes;
- List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
Set<String> expectedVolumes = new HashSet<String>();
+ List<NamespaceInfo> nsInfos = Lists.newArrayList();
+ for (String bpid : BLOCK_POOL_IDS) {
+ nsInfos.add(new NamespaceInfo(0, "cluster-id", bpid, 1));
+ }
for (int i = 0; i < numNewVolumes; i++) {
String path = BASE_DIR + "/newData" + i;
- newLocations.add(StorageLocation.parse(path));
- when(storage.getStorageDir(numExistingVolumes + i))
- .thenReturn(createStorageDirectory(new File(path)));
+ StorageLocation loc = StorageLocation.parse(path);
+ Storage.StorageDirectory sd = createStorageDirectory(new File(path));
+ DataStorage.VolumeBuilder builder =
+ new DataStorage.VolumeBuilder(storage, sd);
+ when(storage.prepareVolume(eq(datanode), eq(loc.getFile()),
+ anyListOf(NamespaceInfo.class)))
+ .thenReturn(builder);
+
+ dataset.addVolume(loc, nsInfos);
}
- when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
- dataset.addVolumes(newLocations, Arrays.asList(BLOCK_POOL_IDS));
assertEquals(totalVolumes, dataset.getVolumes().size());
assertEquals(totalVolumes, dataset.storageMap.size());