You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2016/02/24 21:03:18 UTC

[36/50] [abbrv] hadoop git commit: HDFS-8578. On upgrade, Datanode should process all storage/data dirs in parallel. Contributed by vinayakumarb and szetszwo

HDFS-8578. On upgrade, Datanode should process all storage/data dirs in parallel.  Contributed by vinayakumarb and szetszwo


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/66289a3b
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/66289a3b
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/66289a3b

Branch: refs/heads/HDFS-7240
Commit: 66289a3bf403f307844ea0b6ceed35b603d12c0b
Parents: a2fdfff
Author: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Authored: Mon Feb 22 15:01:15 2016 -0800
Committer: Tsz-Wo Nicholas Sze <sz...@hortonworks.com>
Committed: Mon Feb 22 15:01:15 2016 -0800

----------------------------------------------------------------------
 hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt     |   3 +
 .../org/apache/hadoop/hdfs/DFSConfigKeys.java   |   3 +
 .../server/datanode/BlockPoolSliceStorage.java  |  56 ++++---
 .../hdfs/server/datanode/DataStorage.java       | 146 +++++++++++++++----
 4 files changed, 160 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index c39bcee..256df39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -2840,6 +2840,9 @@ Release 2.7.3 - UNRELEASED
     HDFS-4946. Allow preferLocalNode in BlockPlacementPolicyDefault to be
     configurable (James Kinley and Nathan Roberts via kihwal)
 
+    HDFS-8578. On upgrade, Datanode should process all storage/data dirs in
+    parallel.  (vinayakumarb and szetszwo via szetszwo)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 39e4fe4..a055e2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -836,6 +836,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
     "dfs.datanode.slow.io.warning.threshold.ms";
   public static final long DFS_DATANODE_SLOW_IO_WARNING_THRESHOLD_DEFAULT = 300;
 
+  // Number of parallel threads to load multiple datanode volumes
+  public static final String DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY =
+      "dfs.datanode.parallel.volumes.load.threads.num";
   public static final String DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS_KEY =
       "dfs.datanode.block.id.layout.upgrade.threads";
   public static final int DFS_DATANODE_BLOCK_ID_LAYOUT_UPGRADE_THREADS = 12;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
index acf10f1..90a4669 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceStorage.java
@@ -27,6 +27,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -145,7 +146,8 @@ public class BlockPoolSliceStorage extends Storage {
    * @throws IOException
    */
   private StorageDirectory loadStorageDirectory(NamespaceInfo nsInfo,
-      File dataDir, StartupOption startOpt, Configuration conf)
+      File dataDir, StartupOption startOpt,
+      List<Callable<StorageDirectory>> callables, Configuration conf)
           throws IOException {
     StorageDirectory sd = new StorageDirectory(dataDir, null, true);
     try {
@@ -172,19 +174,17 @@ public class BlockPoolSliceStorage extends Storage {
       // Each storage directory is treated individually.
       // During startup some of them can upgrade or roll back
       // while others could be up-to-date for the regular startup.
-      if (doTransition(sd, nsInfo, startOpt, conf)) {
-        return sd;
-      }
+      if (!doTransition(sd, nsInfo, startOpt, callables, conf)) {
 
-      if (getCTime() != nsInfo.getCTime()) {
-        throw new IOException("Datanode CTime (=" + getCTime()
-            + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
+        // 3. Check CTime and update successfully loaded storage.
+        if (getCTime() != nsInfo.getCTime()) {
+          throw new IOException("Datanode CTime (=" + getCTime()
+              + ") is not equal to namenode CTime (=" + nsInfo.getCTime() + ")");
+        }
+        setServiceLayoutVersion(getServiceLayoutVersion());
+        writeProperties(sd);
       }
 
-      // 3. Update successfully loaded storage.
-      setServiceLayoutVersion(getServiceLayoutVersion());
-      writeProperties(sd);
-
       return sd;
     } catch (IOException ioe) {
       sd.unlock();
@@ -208,7 +208,8 @@ public class BlockPoolSliceStorage extends Storage {
    */
   List<StorageDirectory> loadBpStorageDirectories(NamespaceInfo nsInfo,
       Collection<File> dataDirs, StartupOption startOpt,
-      Configuration conf) throws IOException {
+      List<Callable<StorageDirectory>> callables, Configuration conf)
+          throws IOException {
     List<StorageDirectory> succeedDirs = Lists.newArrayList();
     try {
       for (File dataDir : dataDirs) {
@@ -218,7 +219,7 @@ public class BlockPoolSliceStorage extends Storage {
                   "attempt to load an used block storage: " + dataDir);
         }
         final StorageDirectory sd = loadStorageDirectory(
-            nsInfo, dataDir, startOpt, conf);
+            nsInfo, dataDir, startOpt, callables, conf);
         succeedDirs.add(sd);
       }
     } catch (IOException e) {
@@ -242,11 +243,12 @@ public class BlockPoolSliceStorage extends Storage {
    * @throws IOException on error
    */
   List<StorageDirectory> recoverTransitionRead(NamespaceInfo nsInfo,
-      Collection<File> dataDirs, StartupOption startOpt, Configuration conf)
+      Collection<File> dataDirs, StartupOption startOpt,
+      List<Callable<StorageDirectory>> callables, Configuration conf)
           throws IOException {
     LOG.info("Analyzing storage directories for bpid " + nsInfo.getBlockPoolID());
     final List<StorageDirectory> loaded = loadBpStorageDirectories(
-        nsInfo, dataDirs, startOpt, conf);
+        nsInfo, dataDirs, startOpt, callables, conf);
     for (StorageDirectory sd : loaded) {
       addStorageDir(sd);
     }
@@ -353,7 +355,8 @@ public class BlockPoolSliceStorage extends Storage {
    * @return true if the new properties has been written.
    */
   private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
-      StartupOption startOpt, Configuration conf) throws IOException {
+      StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+      Configuration conf) throws IOException {
     if (startOpt == StartupOption.ROLLBACK && sd.getPreviousDir().exists()) {
       Preconditions.checkState(!getTrashRootDir(sd).exists(),
           sd.getPreviousDir() + " and " + getTrashRootDir(sd) + " should not " +
@@ -395,7 +398,7 @@ public class BlockPoolSliceStorage extends Storage {
     }
     if (this.layoutVersion > HdfsServerConstants.DATANODE_LAYOUT_VERSION
         || this.cTime < nsInfo.getCTime()) {
-      doUpgrade(sd, nsInfo, conf); // upgrade
+      doUpgrade(sd, nsInfo, callables, conf); // upgrade
       return true;
     }
     // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
@@ -425,7 +428,9 @@ public class BlockPoolSliceStorage extends Storage {
    * @throws IOException on error
    */
   private void doUpgrade(final StorageDirectory bpSd,
-      final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
+      final NamespaceInfo nsInfo,
+      final List<Callable<StorageDirectory>> callables,
+      final Configuration conf) throws IOException {
     // Upgrading is applicable only to release with federation or after
     if (!DataNodeLayoutVersion.supports(
         LayoutVersion.Feature.FEDERATION, layoutVersion)) {
@@ -463,10 +468,21 @@ public class BlockPoolSliceStorage extends Storage {
     rename(bpCurDir, bpTmpDir);
     
     final String name = "block pool " + blockpoolID + " at " + bpSd.getRoot();
-    doUgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
+    if (callables == null) {
+      doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV, conf);
+    } else {
+      callables.add(new Callable<StorageDirectory>() {
+        @Override
+        public StorageDirectory call() throws Exception {
+          doUpgrade(name, bpSd, nsInfo, bpPrevDir, bpTmpDir, bpCurDir, oldLV,
+              conf);
+          return bpSd;
+        }
+      });
+    }
   }
 
-  private void doUgrade(String name, final StorageDirectory bpSd,
+  private void doUpgrade(String name, final StorageDirectory bpSd,
       NamespaceInfo nsInfo, final File bpPrevDir, final File bpTmpDir,
       final File bpCurDir, final int oldLV, Configuration conf)
           throws IOException {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/66289a3b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
index 7903194..6697054 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
@@ -37,10 +37,12 @@ import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 
+import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -51,6 +53,7 @@ import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtilClient;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -260,8 +263,8 @@ public class DataStorage extends Storage {
   }
 
   private StorageDirectory loadStorageDirectory(DataNode datanode,
-      NamespaceInfo nsInfo, File dataDir, StartupOption startOpt)
-          throws IOException {
+      NamespaceInfo nsInfo, File dataDir, StartupOption startOpt,
+      List<Callable<StorageDirectory>> callables) throws IOException {
     StorageDirectory sd = new StorageDirectory(dataDir, null, false);
     try {
       StorageState curState = sd.analyzeStorage(startOpt, this);
@@ -287,13 +290,12 @@ public class DataStorage extends Storage {
       // Each storage directory is treated individually.
       // During startup some of them can upgrade or roll back
       // while others could be up-to-date for the regular startup.
-      if (doTransition(sd, nsInfo, startOpt, datanode.getConf())) {
-        return sd;
-      }
+      if (!doTransition(sd, nsInfo, startOpt, callables, datanode.getConf())) {
 
-      // 3. Update successfully loaded storage.
-      setServiceLayoutVersion(getServiceLayoutVersion());
-      writeProperties(sd);
+        // 3. Update successfully loaded storage.
+        setServiceLayoutVersion(getServiceLayoutVersion());
+        writeProperties(sd);
+      }
 
       return sd;
     } catch (IOException ioe) {
@@ -325,7 +327,7 @@ public class DataStorage extends Storage {
     }
 
     StorageDirectory sd = loadStorageDirectory(
-        datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP);
+        datanode, nsInfos.get(0), volume, StartupOption.HOTSWAP, null);
     VolumeBuilder builder =
         new VolumeBuilder(this, sd);
     for (NamespaceInfo nsInfo : nsInfos) {
@@ -336,12 +338,35 @@ public class DataStorage extends Storage {
 
       final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
       final List<StorageDirectory> dirs = bpStorage.loadBpStorageDirectories(
-          nsInfo, bpDataDirs, StartupOption.HOTSWAP, datanode.getConf());
+          nsInfo, bpDataDirs, StartupOption.HOTSWAP, null, datanode.getConf());
       builder.addBpStorageDirectories(nsInfo.getBlockPoolID(), dirs);
     }
     return builder;
   }
 
+  static int getParallelVolumeLoadThreadsNum(int dataDirs, Configuration conf) {
+    final String key
+        = DFSConfigKeys.DFS_DATANODE_PARALLEL_VOLUME_LOAD_THREADS_NUM_KEY;
+    final int n = conf.getInt(key, dataDirs);
+    if (n < 1) {
+      throw new HadoopIllegalArgumentException(key + " = " + n + " < 1");
+    }
+    final int min = Math.min(n, dataDirs);
+    LOG.info("Using " + min + " threads to upgrade data directories ("
+        + key + "=" + n + ", dataDirs=" + dataDirs + ")");
+    return min;
+  }
+
+  static class UpgradeTask {
+    private final StorageLocation dataDir;
+    private final Future<StorageDirectory> future;
+
+    UpgradeTask(StorageLocation dataDir, Future<StorageDirectory> future) {
+      this.dataDir = dataDir;
+      this.future = future;
+    }
+  }
+
   /**
    * Add a list of volumes to be managed by DataStorage. If the volume is empty,
    * format it, otherwise recover it from previous transitions if required.
@@ -356,32 +381,62 @@ public class DataStorage extends Storage {
   synchronized List<StorageDirectory> addStorageLocations(DataNode datanode,
       NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
       StartupOption startOpt) throws IOException {
-    final List<StorageLocation> successLocations = loadDataStorage(
-        datanode, nsInfo, dataDirs, startOpt);
-    return loadBlockPoolSliceStorage(
-        datanode, nsInfo, successLocations, startOpt);
+    final int numThreads = getParallelVolumeLoadThreadsNum(
+        dataDirs.size(), datanode.getConf());
+    final ExecutorService executor = Executors.newFixedThreadPool(numThreads);
+    try {
+      final List<StorageLocation> successLocations = loadDataStorage(
+          datanode, nsInfo, dataDirs, startOpt, executor);
+      return loadBlockPoolSliceStorage(
+          datanode, nsInfo, successLocations, startOpt, executor);
+    } finally {
+      executor.shutdown();
+    }
   }
 
   private List<StorageLocation> loadDataStorage(DataNode datanode,
       NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
-      StartupOption startOpt) throws IOException {
+      StartupOption startOpt, ExecutorService executor) throws IOException {
     final List<StorageLocation> success = Lists.newArrayList();
+    final List<UpgradeTask> tasks = Lists.newArrayList();
     for (StorageLocation dataDir : dataDirs) {
       File root = dataDir.getFile();
       if (!containsStorageDir(root)) {
         try {
           // It first ensures the datanode level format is completed.
+          final List<Callable<StorageDirectory>> callables
+              = Lists.newArrayList();
           final StorageDirectory sd = loadStorageDirectory(
-              datanode, nsInfo, root, startOpt);
-          addStorageDir(sd);
+              datanode, nsInfo, root, startOpt, callables);
+          if (callables.isEmpty()) {
+            addStorageDir(sd);
+            success.add(dataDir);
+          } else {
+            for(Callable<StorageDirectory> c : callables) {
+              tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
+            }
+          }
         } catch (IOException e) {
           LOG.warn("Failed to add storage directory " + dataDir, e);
-          continue;
         }
       } else {
         LOG.info("Storage directory " + dataDir + " has already been used.");
+        success.add(dataDir);
+      }
+    }
+
+    if (!tasks.isEmpty()) {
+      LOG.info("loadDataStorage: " + tasks.size() + " upgrade tasks");
+      for(UpgradeTask t : tasks) {
+        try {
+          addStorageDir(t.future.get());
+          success.add(t.dataDir);
+        } catch (ExecutionException e) {
+          LOG.warn("Failed to upgrade storage directory " + t.dataDir, e);
+        } catch (InterruptedException e) {
+          throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
+        }
       }
-      success.add(dataDir);
     }
 
     return success;
@@ -389,10 +444,11 @@ public class DataStorage extends Storage {
 
   private List<StorageDirectory> loadBlockPoolSliceStorage(DataNode datanode,
       NamespaceInfo nsInfo, Collection<StorageLocation> dataDirs,
-      StartupOption startOpt) throws IOException {
+      StartupOption startOpt, ExecutorService executor) throws IOException {
     final String bpid = nsInfo.getBlockPoolID();
     final BlockPoolSliceStorage bpStorage = getBlockPoolSliceStorage(nsInfo);
     final List<StorageDirectory> success = Lists.newArrayList();
+    final List<UpgradeTask> tasks = Lists.newArrayList();
     for (StorageLocation dataDir : dataDirs) {
       final File curDir = new File(dataDir.getFile(), STORAGE_DIR_CURRENT);
       List<File> bpDataDirs = new ArrayList<File>();
@@ -400,10 +456,17 @@ public class DataStorage extends Storage {
       try {
         makeBlockPoolDataDir(bpDataDirs, null);
 
+        final List<Callable<StorageDirectory>> callables = Lists.newArrayList();
         final List<StorageDirectory> dirs = bpStorage.recoverTransitionRead(
-            nsInfo, bpDataDirs, startOpt, datanode.getConf());
-        for(StorageDirectory sd : dirs) {
-          success.add(sd);
+            nsInfo, bpDataDirs, startOpt, callables, datanode.getConf());
+        if (callables.isEmpty()) {
+          for(StorageDirectory sd : dirs) {
+            success.add(sd);
+          }
+        } else {
+          for(Callable<StorageDirectory> c : callables) {
+            tasks.add(new UpgradeTask(dataDir, executor.submit(c)));
+          }
         }
       } catch (IOException e) {
         LOG.warn("Failed to add storage directory " + dataDir
@@ -411,6 +474,20 @@ public class DataStorage extends Storage {
       }
     }
 
+    if (!tasks.isEmpty()) {
+      LOG.info("loadBlockPoolSliceStorage: " + tasks.size() + " upgrade tasks");
+      for(UpgradeTask t : tasks) {
+        try {
+          success.add(t.future.get());
+        } catch (ExecutionException e) {
+          LOG.warn("Failed to upgrade storage directory " + t.dataDir
+              + " for block pool " + bpid, e);
+        } catch (InterruptedException e) {
+          throw DFSUtilClient.toInterruptedIOException("Task interrupted", e);
+        }
+      }
+    }
+
     return success;
   }
 
@@ -655,7 +732,8 @@ public class DataStorage extends Storage {
    * @return true if the new properties has been written.
    */
   private boolean doTransition(StorageDirectory sd, NamespaceInfo nsInfo,
-      StartupOption startOpt, Configuration conf) throws IOException {
+      StartupOption startOpt, List<Callable<StorageDirectory>> callables,
+      Configuration conf) throws IOException {
     if (startOpt == StartupOption.ROLLBACK) {
       doRollback(sd, nsInfo); // rollback if applicable
     }
@@ -697,7 +775,7 @@ public class DataStorage extends Storage {
         // simply update the properties.
         upgradeProperties(sd);
       } else {
-        doUpgradePreFederation(sd, nsInfo, conf);
+        doUpgradePreFederation(sd, nsInfo, callables, conf);
       }
       return true; // doUgrade already has written properties
     }
@@ -734,7 +812,9 @@ public class DataStorage extends Storage {
    * @param sd  storage directory
    */
   void doUpgradePreFederation(final StorageDirectory sd,
-      final NamespaceInfo nsInfo, final Configuration conf) throws IOException {
+      final NamespaceInfo nsInfo,
+      final List<Callable<StorageDirectory>> callables,
+      final Configuration conf) throws IOException {
     final int oldLV = getLayoutVersion();
     LOG.info("Upgrading storage directory " + sd.getRoot()
              + ".\n   old LV = " + oldLV
@@ -767,10 +847,20 @@ public class DataStorage extends Storage {
     bpStorage.format(curDir, nsInfo);
 
     final File toDir = new File(curBpDir, STORAGE_DIR_CURRENT);
-    doUgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
+    if (callables == null) {
+      doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
+    } else {
+      callables.add(new Callable<StorageDirectory>() {
+        @Override
+        public StorageDirectory call() throws Exception {
+          doUpgrade(sd, nsInfo, prevDir, tmpDir, bbwDir, toDir, oldLV, conf);
+          return sd;
+        }
+      });
+    }
   }
 
-  private void doUgrade(final StorageDirectory sd,
+  private void doUpgrade(final StorageDirectory sd,
       final NamespaceInfo nsInfo, final File prevDir,
       final File tmpDir, final File bbwDir, final File toDir, final int oldLV,
       Configuration conf) throws IOException {