You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2014/08/08 00:53:01 UTC

svn commit: r1616620 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/server/datanode/

Author: atm
Date: Thu Aug  7 22:53:00 2014
New Revision: 1616620

URL: http://svn.apache.org/r1616620
Log:
HDFS-6728. Dynamically add new volumes to DataStorage, formatted if necessary. Contributed by Lei Xu.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616620&r1=1616619&r2=1616620&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug  7 22:53:00 2014
@@ -112,6 +112,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6781. Separate HDFS commands from CommandsManual.apt.vm. (Akira
     Ajisaka via Arpit Agarwal)
 
+    HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
+    necessary. (Lei Xu vi atm)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java?rev=1616620&r1=1616619&r2=1616620&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java Thu Aug  7 22:53:00 2014
@@ -36,8 +36,10 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -106,13 +108,22 @@ public class BlockPoolSliceStorage exten
   void recoverTransitionRead(DataNode datanode, NamespaceInfo nsInfo,
       Collection<File> dataDirs, StartupOption startOpt) throws IOException {
     LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
+    Set<String> existingStorageDirs = new HashSet<String>();
+    for (int i = 0; i < getNumStorageDirs(); i++) {
+      existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+    }
+
     // 1. For each BP data directory analyze the state and
     // check whether all is consistent before transitioning.
-    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
     ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(
         dataDirs.size());
     for (Iterator<File> it = dataDirs.iterator(); it.hasNext();) {
       File dataDir = it.next();
+      if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+        LOG.info("Storage directory " + dataDir + " has already been used.");
+        it.remove();
+        continue;
+      }
       StorageDirectory sd = new StorageDirectory(dataDir, null, true);
       StorageState curState;
       try {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1616620&r1=1616619&r2=1616620&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Thu Aug  7 22:53:00 2014
@@ -149,43 +149,99 @@ public class DataStorage extends Storage
   }
   
   /**
-   * Analyze storage directories.
-   * Recover from previous transitions if required. 
-   * Perform fs state transition if necessary depending on the namespace info.
-   * Read storage info.
-   * <br>
-   * This method should be synchronized between multiple DN threads.  Only the 
-   * first DN thread does DN level storage dir recoverTransitionRead.
-   * 
+   * {{@inheritDoc org.apache.hadoop.hdfs.server.common.Storage#writeAll()}}
+   */
+  private void writeAll(Collection<StorageDirectory> dirs) throws IOException {
+    this.layoutVersion = getServiceLayoutVersion();
+    for (StorageDirectory dir : dirs) {
+      writeProperties(dir);
+    }
+  }
+
+  /**
+   * Add a list of volumes to be managed by DataStorage. If the volume is empty,
+   * format it, otherwise recover it from previous transitions if required.
+   *
+   * @param datanode the reference to DataNode.
    * @param nsInfo namespace information
    * @param dataDirs array of data storage directories
    * @param startOpt startup option
    * @throws IOException
    */
-  synchronized void recoverTransitionRead(DataNode datanode,
+  synchronized void addStorageLocations(DataNode datanode,
       NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
       StartupOption startOpt)
       throws IOException {
-    if (initialized) {
-      // DN storage has been initialized, no need to do anything
-      return;
+    // Similar to recoverTransitionRead, it first ensures the datanode level
+    // format is completed.
+    List<StorageLocation> tmpDataDirs =
+        new ArrayList<StorageLocation>(dataDirs);
+    addStorageLocations(datanode, nsInfo, tmpDataDirs, startOpt, false, true);
+
+    Collection<File> bpDataDirs = new ArrayList<File>();
+    String bpid = nsInfo.getBlockPoolID();
+    for (StorageLocation dir : dataDirs) {
+      File dnRoot = dir.getFile();
+      File bpRoot = BlockPoolSliceStorage.getBpRoot(bpid, new File(dnRoot,
+          STORAGE_DIR_CURRENT));
+      bpDataDirs.add(bpRoot);
     }
-    LOG.info("Data-node version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
-        + " and name-node layout version: " + nsInfo.getLayoutVersion());
-    
-    // 1. For each data directory calculate its state and 
-    // check whether all is consistent before transitioning.
-    // Format and recover.
-    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
-    ArrayList<StorageState> dataDirStates = new ArrayList<StorageState>(dataDirs.size());
+    // mkdir for the list of BlockPoolStorage
+    makeBlockPoolDataDir(bpDataDirs, null);
+    BlockPoolSliceStorage bpStorage = this.bpStorageMap.get(bpid);
+    if (bpStorage == null) {
+      bpStorage = new BlockPoolSliceStorage(
+          nsInfo.getNamespaceID(), bpid, nsInfo.getCTime(),
+          nsInfo.getClusterID());
+    }
+
+    bpStorage.recoverTransitionRead(datanode, nsInfo, bpDataDirs, startOpt);
+    addBlockPoolStorage(bpid, bpStorage);
+  }
+
+  /**
+   * Add a list of volumes to be managed by this DataStorage. If the volume is
+   * empty, it formats the volume, otherwise it recovers it from previous
+   * transitions if required.
+   *
+   * If isInitialize is false, only the directories that have finished the
+   * doTransition() process will be added into DataStorage.
+   *
+   * @param datanode the reference to DataNode.
+   * @param nsInfo namespace information
+   * @param dataDirs array of data storage directories
+   * @param startOpt startup option
+   * @param isInitialize whether it is called when DataNode starts up.
+   * @throws IOException
+   */
+  private synchronized void addStorageLocations(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+      StartupOption startOpt, boolean isInitialize, boolean ignoreExistingDirs)
+      throws IOException {
+    Set<String> existingStorageDirs = new HashSet<String>();
+    for (int i = 0; i < getNumStorageDirs(); i++) {
+      existingStorageDirs.add(getStorageDir(i).getRoot().getAbsolutePath());
+    }
+
+    // 1. For each data directory calculate its state and check whether all is
+    // consistent before transitioning. Format and recover.
+    ArrayList<StorageState> dataDirStates =
+        new ArrayList<StorageState>(dataDirs.size());
+    List<StorageDirectory> addedStorageDirectories =
+        new ArrayList<StorageDirectory>();
     for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
       File dataDir = it.next().getFile();
+      if (existingStorageDirs.contains(dataDir.getAbsolutePath())) {
+        LOG.info("Storage directory " + dataDir + " has already been used.");
+        it.remove();
+        continue;
+      }
       StorageDirectory sd = new StorageDirectory(dataDir);
       StorageState curState;
       try {
         curState = sd.analyzeStorage(startOpt, this);
         // sd is locked but not opened
-        switch(curState) {
+        switch (curState) {
         case NORMAL:
           break;
         case NON_EXISTENT:
@@ -194,7 +250,8 @@ public class DataStorage extends Storage
           it.remove();
           continue;
         case NOT_FORMATTED: // format
-          LOG.info("Storage directory " + dataDir + " is not formatted");
+          LOG.info("Storage directory " + dataDir + " is not formatted for "
+            + nsInfo.getBlockPoolID());
           LOG.info("Formatting ...");
           format(sd, nsInfo, datanode.getDatanodeUuid());
           break;
@@ -208,33 +265,82 @@ public class DataStorage extends Storage
         //continue with other good dirs
         continue;
       }
-      // add to the storage list
-      addStorageDir(sd);
+      if (isInitialize) {
+        addStorageDir(sd);
+      }
+      addedStorageDirectories.add(sd);
       dataDirStates.add(curState);
     }
 
-    if (dataDirs.size() == 0 || dataDirStates.size() == 0)  // none of the data dirs exist
+    if (dataDirs.size() == 0 || dataDirStates.size() == 0) {
+      // none of the data dirs exist
+      if (ignoreExistingDirs) {
+        return;
+      }
       throw new IOException(
           "All specified directories are not accessible or do not exist.");
+    }
 
     // 2. Do transitions
     // Each storage directory is treated individually.
-    // During startup some of them can upgrade or rollback 
-    // while others could be uptodate for the regular startup.
-    try {
-      for (int idx = 0; idx < getNumStorageDirs(); idx++) {
-        doTransition(datanode, getStorageDir(idx), nsInfo, startOpt);
-        createStorageID(getStorageDir(idx));
+    // During startup some of them can upgrade or rollback
+    // while others could be up-to-date for the regular startup.
+    for (Iterator<StorageDirectory> it = addedStorageDirectories.iterator();
+        it.hasNext(); ) {
+      StorageDirectory sd = it.next();
+      try {
+        doTransition(datanode, sd, nsInfo, startOpt);
+        createStorageID(sd);
+      } catch (IOException e) {
+        if (!isInitialize) {
+          sd.unlock();
+          it.remove();
+          continue;
+        }
+        unlockAll();
+        throw e;
       }
-    } catch (IOException e) {
-      unlockAll();
-      throw e;
     }
 
-    // 3. Update all storages. Some of them might have just been formatted.
-    this.writeAll();
+    // 3. Update all successfully loaded storages. Some of them might have just
+    // been formatted.
+    this.writeAll(addedStorageDirectories);
+
+    // 4. Make newly loaded storage directories visible for service.
+    if (!isInitialize) {
+      this.storageDirs.addAll(addedStorageDirectories);
+    }
+  }
+
+  /**
+   * Analyze storage directories.
+   * Recover from previous transitions if required.
+   * Perform fs state transition if necessary depending on the namespace info.
+   * Read storage info.
+   * <br>
+   * This method should be synchronized between multiple DN threads.  Only the
+   * first DN thread does DN level storage dir recoverTransitionRead.
+   *
+   * @param nsInfo namespace information
+   * @param dataDirs array of data storage directories
+   * @param startOpt startup option
+   * @throws IOException
+   */
+  synchronized void recoverTransitionRead(DataNode datanode,
+      NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
+      StartupOption startOpt)
+      throws IOException {
+    if (initialized) {
+      // DN storage has been initialized, no need to do anything
+      return;
+    }
+    LOG.info("DataNode version: " + HdfsConstants.DATANODE_LAYOUT_VERSION
+        + " and NameNode layout version: " + nsInfo.getLayoutVersion());
+
+    this.storageDirs = new ArrayList<StorageDirectory>(dataDirs.size());
+    addStorageLocations(datanode, nsInfo, dataDirs, startOpt, true, false);
     
-    // 4. mark DN storage is initialized
+    // mark DN storage is initialized
     this.initialized = true;
   }
 

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java?rev=1616620&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataStorage.java Thu Aug  7 22:53:00 2014
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.server.datanode;
+
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestDataStorage {
+  private final static String DEFAULT_BPID = "bp-0";
+  private final static String CLUSTER_ID = "cluster0";
+  private final static String BUILD_VERSION = "2.0";
+  private final static String SOFTWARE_VERSION = "2.0";
+  private final static long CTIME = 1;
+  private final static File TEST_DIR =
+      new File(System.getProperty("test.build.data") + "/dstest");
+  private final static StartupOption START_OPT = StartupOption.REGULAR;
+
+  private DataNode mockDN = Mockito.mock(DataNode.class);
+  private NamespaceInfo nsInfo;
+  private DataStorage storage;
+
+  @Before
+  public void setUp() throws IOException {
+    storage = new DataStorage();
+    nsInfo = new NamespaceInfo(0, CLUSTER_ID, DEFAULT_BPID, CTIME,
+        BUILD_VERSION, SOFTWARE_VERSION);
+    FileUtil.fullyDelete(TEST_DIR);
+    assertTrue("Failed to make test dir.", TEST_DIR.mkdirs());
+  }
+
+  @After
+  public void tearDown() throws IOException {
+    FileUtil.fullyDelete(TEST_DIR);
+  }
+
+  private static List<StorageLocation> createStorageLocations(int numLocs)
+      throws IOException {
+    return createStorageLocations(numLocs, false);
+  }
+
+  /**
+   * Create a list of StorageLocations.
+   * If asFile sets to true, create StorageLocation as regular files, otherwise
+   * create directories for each location.
+   * @param numLocs the total number of StorageLocations to be created.
+   * @param asFile set to true to create as file.
+   * @return a list of StorageLocations.
+   */
+  private static List<StorageLocation> createStorageLocations(
+      int numLocs, boolean asFile) throws IOException {
+    List<StorageLocation> locations = new ArrayList<StorageLocation>();
+    for (int i = 0; i < numLocs; i++) {
+      String uri = TEST_DIR + "/data" + i;
+      File file = new File(uri);
+      if (asFile) {
+        file.getParentFile().mkdirs();
+        file.createNewFile();
+      } else {
+        file.mkdirs();
+      }
+      StorageLocation loc = StorageLocation.parse(uri);
+      locations.add(loc);
+    }
+    return locations;
+  }
+
+  private static List<NamespaceInfo> createNamespaceInfos(int num) {
+    List<NamespaceInfo> nsInfos = new ArrayList<NamespaceInfo>();
+    for (int i = 0; i < num; i++) {
+      String bpid = "bp-" + i;
+      nsInfos.add(new NamespaceInfo(0, CLUSTER_ID, bpid, CTIME, BUILD_VERSION,
+          SOFTWARE_VERSION));
+    }
+    return nsInfos;
+  }
+
+  /** Check whether the path is a valid DataNode data directory. */
+  private static void checkDir(File dataDir) {
+    Storage.StorageDirectory sd = new Storage.StorageDirectory(dataDir);
+    assertTrue(sd.getRoot().isDirectory());
+    assertTrue(sd.getCurrentDir().isDirectory());
+    assertTrue(sd.getVersionFile().isFile());
+  }
+
+  /** Check whether the root is a valid BlockPoolSlice storage. */
+  private static void checkDir(File root, String bpid) {
+    Storage.StorageDirectory sd = new Storage.StorageDirectory(root);
+    File bpRoot = new File(sd.getCurrentDir(), bpid);
+    Storage.StorageDirectory bpSd = new Storage.StorageDirectory(bpRoot);
+    assertTrue(bpSd.getRoot().isDirectory());
+    assertTrue(bpSd.getCurrentDir().isDirectory());
+    assertTrue(bpSd.getVersionFile().isFile());
+  }
+
+  @Test
+  public void testAddStorageDirectories() throws IOException,
+      URISyntaxException {
+    final int numLocations = 3;
+    final int numNamespace = 3;
+    List<StorageLocation> locations = createStorageLocations(numLocations);
+
+    // Add volumes for multiple namespaces.
+    List<NamespaceInfo> namespaceInfos = createNamespaceInfos(numNamespace);
+    for (NamespaceInfo ni : namespaceInfos) {
+      storage.addStorageLocations(mockDN, ni, locations, START_OPT);
+      for (StorageLocation sl : locations) {
+        checkDir(sl.getFile());
+        checkDir(sl.getFile(), ni.getBlockPoolID());
+      }
+    }
+
+    assertEquals(numLocations, storage.getNumStorageDirs());
+
+    locations = createStorageLocations(numLocations);
+    try {
+      storage.addStorageLocations(mockDN, namespaceInfos.get(0),
+          locations, START_OPT);
+      fail("Expected to throw IOException: adding active directories.");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "All specified directories are not accessible or do not exist.", e);
+    }
+    // The number of active storage dirs has not changed, since it tries to
+    // add the storage dirs that are under service.
+    assertEquals(numLocations, storage.getNumStorageDirs());
+
+    // Add more directories.
+    locations = createStorageLocations(6);
+    storage.addStorageLocations(mockDN, nsInfo, locations, START_OPT);
+    assertEquals(6, storage.getNumStorageDirs());
+  }
+
+  @Test
+  public void testRecoverTransitionReadFailure() throws IOException {
+    final int numLocations = 3;
+    List<StorageLocation> locations =
+        createStorageLocations(numLocations, true);
+
+    try {
+      storage.recoverTransitionRead(mockDN, nsInfo, locations, START_OPT);
+      fail("An IOException should throw: all StorageLocations are NON_EXISTENT");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains(
+          "All specified directories are not accessible or do not exist.", e);
+    }
+    assertEquals(0, storage.getNumStorageDirs());
+  }
+
+  /**
+   * This test enforces the behavior that if there is an exception from
+   * doTransition() during DN starts up, the storage directories that have
+   * already been processed are still visible, i.e., in
+   * DataStorage.storageDirs().
+   */
+  @Test
+  public void testRecoverTransitionReadDoTransitionFailure()
+      throws IOException {
+    final int numLocations = 3;
+    List<StorageLocation> locations = createStorageLocations(numLocations);
+    String bpid = nsInfo.getBlockPoolID();
+    // Prepare volumes
+    storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+
+    // Reset DataStorage
+    storage.unlockAll();
+    storage = new DataStorage();
+    // Trigger an exception from doTransition().
+    nsInfo.clusterID = "cluster1";
+    try {
+      storage.recoverTransitionRead(mockDN, bpid, nsInfo, locations, START_OPT);
+      fail("Expect to throw an exception from doTransition()");
+    } catch (IOException e) {
+      GenericTestUtils.assertExceptionContains("Incompatible clusterIDs", e);
+    }
+    assertEquals(numLocations, storage.getNumStorageDirs());
+  }
+}