You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2014/08/08 00:46:52 UTC
svn commit: r1616615 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/test/java/org/apache/hadoop/hdfs/server/datanode/
Author: atm
Date: Thu Aug 7 22:46:51 2014
New Revision: 1616615
URL: http://svn.apache.org/r1616615
Log:
HDFS-6728. Dynamically add new volumes to DataStorage, formatted if necessary. Contributed by Lei Xu.
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616615&r1=1616614&r2=1616615&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 7 22:46:51 2014
@@ -370,6 +370,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6781. Separate HDFS commands from CommandsManual.apt.vm. (Akira
Ajisaka via Arpit Agarwal)
+ HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
+ necessary. (Lei Xu via atm)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1616615&r1=1616614&r2=1616615&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Thu Aug 7 22:46:51 2014
@@ -36,8 +36,10 @@ import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.Properties;
+import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -106,13 +108,22 @@ public class BlockPoolSliceStorage exten
void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
Collection<File> dataDirs, StartupOption startOpt) throws IOException {
LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
+ Set<String> existingStorageDirs = new HashSet<String>();
+ for (int i = 0; i < getNumStorageDirs(); i++) {
+ existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+ }
+
// 1. For each BP data directory analyze the state and
// check whether all is consistent before transitioning.
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
dataDirs.size());
for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next();
+ if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+ LOG.info("Storage directory " + dataDir + " has already been used.");
+ it.remove();
+ continue;
+ }
StorageDirectory sd = new StorageDirectory(dataDir, null, true);
StorageState curState;
try {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1616615&r1=1616614&r2=1616615&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Thu Aug 7 22:46:51 2014
@@ -55,6 +55,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -172,43 +173,99 @@ public class DataStorage extends Storage
}
/**
- * Analyze storage directories.
- * Recover from previous transitions if required.
- * Perform fs state transition if necessary depending on the namespace info.
- * Read storage info.
- * <br>
- * This method should be synchronized between multiple DN threads. Only the
- * first DN thread does DN level storage dir recoverTransitionRead.
- *
+ * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+ */
+ private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
+ this.layoutVersion = getServiceLayoutVersion();
+ for (StorageDirectory dir : dirs) {
+ writeProperties(dir);
+ }
+ }
+
+ /**
+ * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+ * format it, otherwise recover it from previous transitions if required.
+ *
+ * @param datanode the reference to DataNode.
* @param nsInfo namespace information
* @param dataDirs array of data storage directories
* @param startOpt startup option
* @throws IOException
*/
- synchronized void recoverTransitionRead(DataNode datanode,
+ synchronized void addStorageLocations(DataNode datanode,
NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
StartupOption startOpt)
throws IOException {
- if (initialized) {
- // DN storage has been initialized, no need to do anything
- return;
+ // Similar to recoverTransitionRead, it first ensures the datanode level
+ // format is completed.
+ List<StorageLocation> tmpDataDirs =
+ new ArrayList<StorageLocation>(dataDirs);
+ addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
+
+ Collection<File> bpDataDirs = new ArrayList<File>();
+ String bpid = nsInfo.getBlockPoolID();
+ for (StorageLocation dir : dataDirs) {
+ File dnRoot = dir.getFile();
+ File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
+ STORAGE_DIR_CURRENT));
+ bpDataDirs.add(bpRoot);
}
- LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
- + " and name-node layout version: " + nsInfo.getLayoutVersion());
-
- // 1. For each data directory calculate its state and
- // check whether all is consistent before transitioning.
- // Format and recover.
- this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
- ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
+ // mkdir for the list of BlockPoolStorage
+ makeBlockPoolDataDir(bpDataDirs, null);
+ BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+ if (bpStorage == null) {
+ bpStorage = new BlockPoolSliceStorage(
+ nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+ nsInfo.getClusterID());
+ }
+
+ bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+ addBlockPoolStorage(bpid, bpStorage);
+ }
+
+ /**
+ * Add a list of volumes to be managed by this DataStorage. If the volume is
+ * empty, it formats the volume, otherwise it recovers it from previous
+ * transitions if required.
+ *
+ * If isInitialize is false, only the directories that have finished the
+ * doTransition() process will be added into DataStorage.
+ *
+ * @param datanode the reference to DataNode.
+ * @param nsInfo namespace information
+ * @param dataDirs array of data storage directories
+ * @param startOpt startup option
+ * @param isInitialize whether it is called when DataNode starts up.
+ * @throws IOException
+ */
+ private synchronized void addStorageLocations(DataNode datanode,
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+ StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
+ throws IOException {
+ Set<String> existingStorageDirs = new HashSet<String>();
+ for (int i = 0; i < getNumStorageDirs(); i++) {
+ existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+ }
+
+ // 1. For each data directory calculate its state and check whether all is
+ // consistent before transitioning. Format and recover.
+ ArrayList<StorageState> dataDirStates =
+ new ArrayList<StorageState>(dataDirs.size());
+ List<StorageDirectory> addedStorageDirectories =
+ new ArrayList<StorageDirectory>();
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
File dataDir = it.next().getFile();
+ if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+ LOG.info("Storage directory " + dataDir + " has already been used.");
+ it.remove();
+ continue;
+ }
StorageDirectory sd = new StorageDirectory(dataDir);
StorageState curState;
try {
curState = sd.analyzeStorage(startOpt, this);
// sd is locked but not opened
- switch(curState) {
+ switch (curState) {
case NORMAL:
break;
case NON_EXISTENT:
@@ -217,7 +274,8 @@ public class DataStorage extends Storage
it.remove();
continue;
case NOT_FORMATTED: // format
- LOG.info("Storage directory " + dataDir + " is not formatted");
+ LOG.info("Storage directory " + dataDir + " is not formatted for "
+ + nsInfo.getBlockPoolID());
LOG.info("Formatting ...");
format(sd, nsInfo, datanode.getDatanodeUuid());
break;
@@ -231,33 +289,82 @@ public class DataStorage extends Storage
//continue with other good dirs
continue;
}
- // add to the storage list
- addStorageDir(sd);
+ if (isInitialize) {
+ addStorageDir(sd);
+ }
+ addedStorageDirectories.add(sd);
dataDirStates.add(curState);
}
- if (dataDirs.size() == 0 || dataDirStates.size() == 0) // none of the data dirs exist
+ if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
+ // none of the data dirs exist
+ if (ignoreExistingDirs) {
+ return;
+ }
throw new IOException(
"All specified directories are not accessible or do not exist.");
+ }
// 2. Do transitions
// Each storage directory is treated individually.
- // During startup some of them can upgrade or rollback
- // while others could be uptodate for the regular startup.
- try {
- for (int idx = 0; idx < getNumStorageDirs(); idx++) {
- doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
- createStorageID(getStorageDir(idx));
+ // During startup some of them can upgrade or rollback
+ // while others could be up-to-date for the regular startup.
+ for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
+ it.hasNext(); ) {
+ StorageDirectory sd = it.next();
+ try {
+ doTransition(datanode, sd, nsInfo, startOpt);
+ createStorageID(sd);
+ } catch (IOException e) {
+ if (!isInitialize) {
+ sd.unlock();
+ it.remove();
+ continue;
+ }
+ unlockAll();
+ throw e;
}
- } catch (IOException e) {
- unlockAll();
- throw e;
}
- // 3. Update all storages. Some of them might have just been formatted.
- this.writeAll();
+ // 3. Update all successfully loaded storages. Some of them might have just
+ // been formatted.
+ this.writeAll(addedStorageDirectories);
+
+ // 4. Make newly loaded storage directories visible for service.
+ if (!isInitialize) {
+ this.storageDirs.addAll(addedStorageDirectories);
+ }
+ }
+
+ /**
+ * Analyze storage directories.
+ * Recover from previous transitions if required.
+ * Perform fs state transition if necessary depending on the namespace info.
+ * Read storage info.
+ * <br>
+ * This method should be synchronized between multiple DN threads. Only the
+ * first DN thread does DN level storage dir recoverTransitionRead.
+ *
+ * @param nsInfo namespace information
+ * @param dataDirs array of data storage directories
+ * @param startOpt startup option
+ * @throws IOException
+ */
+ synchronized void recoverTransitionRead(DataNode datanode,
+ NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+ StartupOption startOpt)
+ throws IOException {
+ if (initialized) {
+ // DN storage has been initialized, no need to do anything
+ return;
+ }
+ LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+ + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+
+ this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+ addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
- // 4. mark DN storage is initialized
+ // mark DN storage is initialized
this.initialized = true;
}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java?rev=1616615&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java Thu Aug 7 22:46:51 2014
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestDataStorage {
+ private final static String DEFAULT_BPID = "bp-0";
+ private final static String CLUSTER_ID = "cluster0";
+ private final static String BUILD_VERSION = "2.0";
+ private final static String SOFTWARE_VERSION = "2.0";
+ private final static long CTIME = 1;
+ private final static File TEST_DIR =
+ new File(System.getProperty("test.build.data") + "/dstest");
+ private final static StartupOption START_OPT = StartupOption.REGULAR;
+
+ private DataNode mockDN = Mockito.mock(DataNode.class);
+ private NamespaceInfo nsInfo;
+ private DataStorage storage;
+
+ @Before
+ public void setUp() throws IOException {
+ storage = new DataStorage();
+ nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME,
+ BUILD_VERSION, SOFTWARE_VERSION);
+ FileUtil.fullyDelete(TEST_DIR);
+ assertTrue("Failed to make test dir.", TEST_DIR.mkdirs());
+ }
+
+ @After
+ public void tearDown() throws IOException {
+ FileUtil.fullyDelete(TEST_DIR);
+ }
+
+ private static List<StorageLocation> createStorageLocations(int numLocs)
+ throws IOException {
+ return createStorageLocations(numLocs, false);
+ }
+
+ /**
+ * Create a list of StorageLocations.
+ * If asFile sets to true, create StorageLocation as regular files, otherwise
+ * create directories for each location.
+ * @param numLocs the total number of StorageLocations to be created.
+ * @param asFile set to true to create as file.
+ * @return a list of StorageLocations.
+ */
+ private static List<StorageLocation> createStorageLocations(
+ int numLocs, boolean asFile) throws IOException {
+ List<StorageLocation> locations = new ArrayList<StorageLocation>();
+ for (int i = 0; i < numLocs; i++) {
+ String uri = TEST_DIR + "/data" + i;
+ File file = new File(uri);
+ if (asFile) {
+ file.getParentFile().mkdirs();
+ file.createNewFile();
+ } else {
+ file.mkdirs();
+ }
+ StorageLocation loc = StorageLocation.parse(uri);
+ locations.add(loc);
+ }
+ return locations;
+ }
+
+ private static List<NamespaceInfo> createNamespaceInfos(int num) {
+ List<NamespaceInfo> nsInfos = new ArrayList<NamespaceInfo>();
+ for (int i = 0; i < num; i++) {
+ String bpid = "bp-" + i;
+ nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, CTIME, BUILD_VERSION,
+ SOFTWARE_VERSION));
+ }
+ return nsInfos;
+ }
+
+ /** Check whether the path is a valid DataNode data directory. */
+ private static void checkDir(File dataDir) {
+ Storage.StorageDirectory sd = new Storage.StorageDirectory(dataDir);
+ assertTrue(sd.getRoot().isDirectory());
+ assertTrue(sd.getCurrentDir().isDirectory());
+ assertTrue(sd.getVersionFile().isFile());
+ }
+
+ /** Check whether the root is a valid BlockPoolSlice storage. */
+ private static void checkDir(File root, String bpid) {
+ Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
+ File bpRoot = new File(sd.getCurrentDir(), bpid);
+ Storage.StorageDirectory bpSd = new Storage.StorageDirectory(bpRoot);
+ assertTrue(bpSd.getRoot().isDirectory());
+ assertTrue(bpSd.getCurrentDir().isDirectory());
+ assertTrue(bpSd.getVersionFile().isFile());
+ }
+
+ @Test
+ public void testAddStorageDirectories() throws IOException,
+ URISyntaxException {
+ final int numLocations = 3;
+ final int numNamespace = 3;
+ List<StorageLocation> locations = createStorageLocations(numLocations);
+
+ // Add volumes for multiple namespaces.
+ List<NamespaceInfo> namespaceInfos = createNamespaceInfos(numNamespace);
+ for (NamespaceInfo ni : namespaceInfos) {
+ storage.addStorageLocations(mockDN, ni, locations, START_OPT);
+ for (StorageLocation sl : locations) {
+ checkDir(sl.getFile());
+ checkDir(sl.getFile(), ni.getBlockPoolID());
+ }
+ }
+
+ assertEquals(numLocations, storage.getNumStorageDirs());
+
+ locations = createStorageLocations(numLocations);
+ try {
+ storage.addStorageLocations(mockDN, namespaceInfos.get(0),
+ locations, START_OPT);
+ fail("Expected to throw IOException: adding active directories.");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "All specified directories are not accessible or do not exist.", e);
+ }
+ // The number of active storage dirs has not changed, since it tries to
+ // add the storage dirs that are under service.
+ assertEquals(numLocations, storage.getNumStorageDirs());
+
+ // Add more directories.
+ locations = createStorageLocations(6);
+ storage.addStorageLocations(mockDN, nsInfo, locations, START_OPT);
+ assertEquals(6, storage.getNumStorageDirs());
+ }
+
+ @Test
+ public void testRecoverTransitionReadFailure() throws IOException {
+ final int numLocations = 3;
+ List<StorageLocation> locations =
+ createStorageLocations(numLocations, true);
+
+ try {
+ storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
+ fail("An IOException should throw: all StorageLocations are NON_EXISTENT");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains(
+ "All specified directories are not accessible or do not exist.", e);
+ }
+ assertEquals(0, storage.getNumStorageDirs());
+ }
+
+ /**
+ * This test enforces the behavior that if there is an exception from
+ * doTransition() during DN starts up, the storage directories that have
+ * already been processed are still visible, i.e., in
+ * DataStorage.storageDirs().
+ */
+ @Test
+ public void testRecoverTransitionReadDoTransitionFailure()
+ throws IOException {
+ final int numLocations = 3;
+ List<StorageLocation> locations = createStorageLocations(numLocations);
+ String bpid = nsInfo.getBlockPoolID();
+ // Prepare volumes
+ storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+
+ // Reset DataStorage
+ storage.unlockAll();
+ storage = new DataStorage();
+ // Trigger an exception from doTransition().
+ nsInfo.clusterID = "cluster1";
+ try {
+ storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+ fail("Expect to throw an exception from doTransition()");
+ } catch (IOException e) {
+ GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e);
+ }
+ assertEquals(numLocations, storage.getNumStorageDirs());
+ }
+}