You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2014/08/08 01:01:37 UTC

svn commit: r1616624 - in /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/ src/main/java/org/apache/hadoop/hdfs/se...

Author: atm
Date: Thu Aug  7 23:01:36 2014
New Revision: 1616624

URL: http://svn.apache.org/r1616624
Log:
HDFS-6740. Make FSDataset support adding data volumes dynamically. Contributed by Lei Xu.

Added:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
Modified:
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
    hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug  7 23:01:36 2014
@@ -115,6 +115,9 @@ Release 2.6.0 - UNRELEASED
     HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
     necessary. (Lei Xu vi atm)
 
+    HDFS-6740. Make FSDataset support adding data volumes dynamically. (Lei
+    Xu via atm)
+
   OPTIMIZATIONS
 
     HDFS-6690. Deduplicate xattr names in memory. (wang)

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Thu Aug  7 23:01:36 2014
@@ -78,7 +78,7 @@ public class StorageLocation {
    * @return A StorageLocation object if successfully parsed, null otherwise.
    *         Does not throw any exceptions.
    */
-  static StorageLocation parse(String rawLocation)
+  public static StorageLocation parse(String rawLocation)
       throws IOException, SecurityException {
     Matcher matcher = regex.matcher(rawLocation);
     StorageType storageType = StorageType.DEFAULT;

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Thu Aug  7 23:01:36 2014
@@ -22,6 +22,7 @@ import java.io.File;
 import java.io.FileDescriptor;
 import java.io.IOException;
 import java.io.InputStream;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
 import org.apache.hadoop.hdfs.server.datanode.Replica;
 import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
 import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends 
   /** @return a list of volumes. */
   public List<V> getVolumes();
 
+  /** Add an array of StorageLocation to FsDataset. */
+  public void addVolumes(Collection<StorageLocation> volumes)
+      throws IOException;
+
   /** @return a storage with the given storage ID */
   public DatanodeStorage getStorage(final String storageUuid);
 

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Thu Aug  7 23:01:36 2014
@@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService {
   private static final long THREADS_KEEP_ALIVE_SECONDS = 60; 
   
   private final DataNode datanode;
+  private final ThreadGroup threadGroup;
   private Map<File, ThreadPoolExecutor> executors
       = new HashMap<File, ThreadPoolExecutor>();
   
@@ -70,42 +71,52 @@ class FsDatasetAsyncDiskService {
    * 
    * The AsyncDiskServices uses one ThreadPool per volume to do the async
    * disk operations.
-   * 
-   * @param volumes The roots of the data volumes.
    */
-  FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
+  FsDatasetAsyncDiskService(DataNode datanode) {
     this.datanode = datanode;
+    this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+  }
+
+  private void addExecutorForVolume(final File volume) {
+    ThreadFactory threadFactory = new ThreadFactory() {
+      int counter = 0;
+
+      @Override
+      public Thread newThread(Runnable r) {
+        int thisIndex;
+        synchronized (this) {
+          thisIndex = counter++;
+        }
+        Thread t = new Thread(threadGroup, r);
+        t.setName("Async disk worker #" + thisIndex +
+            " for volume " + volume);
+        return t;
+      }
+    };
 
-    final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
-    // Create one ThreadPool per volume
-    for (int v = 0 ; v < volumes.length; v++) {
-      final File vol = volumes[v];
-      ThreadFactory threadFactory = new ThreadFactory() {
-          int counter = 0;
-
-          @Override
-          public Thread newThread(Runnable r) {
-            int thisIndex;
-            synchronized (this) {
-              thisIndex = counter++;
-            }
-            Thread t = new Thread(threadGroup, r);
-            t.setName("Async disk worker #" + thisIndex +
-                      " for volume " + vol);
-            return t;
-          }
-        };
-
-      ThreadPoolExecutor executor = new ThreadPoolExecutor(
-          CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME, 
-          THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS, 
-          new LinkedBlockingQueue<Runnable>(), threadFactory);
-
-      // This can reduce the number of running threads
-      executor.allowCoreThreadTimeOut(true);
-      executors.put(vol, executor);
+    ThreadPoolExecutor executor = new ThreadPoolExecutor(
+        CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+        THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+    // This can reduce the number of running threads
+    executor.allowCoreThreadTimeOut(true);
+    executors.put(volume, executor);
+  }
+
+  /**
+   * Starts AsyncDiskService for a new volume
+   * @param volume the root of the new data volume.
+   */
+  synchronized void addVolume(File volume) {
+    if (executors == null) {
+      throw new RuntimeException("AsyncDiskService is already shutdown");
     }
-    
+    ThreadPoolExecutor executor = executors.get(volume);
+    if (executor != null) {
+      throw new RuntimeException("Volume " + volume + " is already existed.");
+    }
+    addExecutorForVolume(volume);
   }
   
   synchronized long countPendingDeletions() {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Thu Aug  7 23:01:36 2014
@@ -202,6 +202,7 @@ class FsDatasetImpl implements FsDataset
   final Map<String, DatanodeStorage> storageMap;
   final FsDatasetAsyncDiskService asyncDiskService;
   final FsDatasetCache cacheManager;
+  private final Configuration conf;
   private final int validVolsRequired;
 
   final ReplicaMap volumeMap;
@@ -216,6 +217,7 @@ class FsDatasetImpl implements FsDataset
       ) throws IOException {
     this.datanode = datanode;
     this.dataStorage = storage;
+    this.conf = conf;
     // The number of volumes required for operation is the total number 
     // of volumes minus the number of failed volumes we can tolerate.
     final int volFailuresTolerated =
@@ -242,38 +244,76 @@ class FsDatasetImpl implements FsDataset
     }
 
     storageMap = new HashMap<String, DatanodeStorage>();
-    final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
-        storage.getNumStorageDirs());
-    for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
-      Storage.StorageDirectory sd = storage.getStorageDir(idx);
-      final File dir = sd.getCurrentDir();
-      final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
-      volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
-          storageType));
-      LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
-      storageMap.put(sd.getStorageUuid(),
-          new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
-    }
     volumeMap = new ReplicaMap(this);
-
     @SuppressWarnings("unchecked")
     final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
         ReflectionUtils.newInstance(conf.getClass(
             DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
             RoundRobinVolumeChoosingPolicy.class,
             VolumeChoosingPolicy.class), conf);
-    volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
-    volumes.initializeReplicaMaps(volumeMap);
+    volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+    asyncDiskService = new FsDatasetAsyncDiskService(datanode);
 
-    File[] roots = new File[storage.getNumStorageDirs()];
     for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
-      roots[idx] = storage.getStorageDir(idx).getCurrentDir();
+      addVolume(dataLocations, storage.getStorageDir(idx));
     }
-    asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
+
     cacheManager = new FsDatasetCache(this);
     registerMBean(datanode.getDatanodeUuid());
   }
 
+  private void addVolume(Collection<StorageLocation> dataLocations,
+      Storage.StorageDirectory sd) throws IOException {
+    final File dir = sd.getCurrentDir();
+    final StorageType storageType =
+        getStorageTypeFromLocations(dataLocations, sd.getRoot());
+
+    // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
+    // nothing needed to be rolled back to make various data structures, e.g.,
+    // storageMap and asyncDiskService, consistent.
+    FsVolumeImpl fsVolume = new FsVolumeImpl(
+        this, sd.getStorageUuid(), dir, this.conf, storageType);
+    fsVolume.getVolumeMap(volumeMap);
+
+    volumes.addVolume(fsVolume);
+    storageMap.put(sd.getStorageUuid(),
+        new DatanodeStorage(sd.getStorageUuid(),
+                            DatanodeStorage.State.NORMAL,
+                            storageType));
+    asyncDiskService.addVolume(sd.getCurrentDir());
+
+    LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+  }
+
+  /**
+   * Add an array of StorageLocation to FsDataset.
+   *
+   * @pre dataStorage must have these volumes.
+   * @param volumes
+   * @throws IOException
+   */
+  @Override
+  public synchronized void addVolumes(Collection<StorageLocation> volumes)
+      throws IOException {
+    final Collection<StorageLocation> dataLocations =
+        DataNode.getStorageLocations(this.conf);
+    Map<String, Storage.StorageDirectory> allStorageDirs =
+        new HashMap<String, Storage.StorageDirectory>();
+    for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+      Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+      allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
+    }
+
+    for (StorageLocation vol : volumes) {
+      String key = vol.getFile().getAbsolutePath();
+      if (!allStorageDirs.containsKey(key)) {
+        LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
+      } else {
+        addVolume(dataLocations, allStorageDirs.get(key));
+      }
+    }
+  }
+
   private StorageType getStorageTypeFromLocations(
       Collection<StorageLocation> dataLocations, File dir) {
     for (StorageLocation dataLocation : dataLocations) {

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Thu Aug  7 23:01:36 2014
@@ -40,9 +40,8 @@ class FsVolumeList {
   private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
   private volatile int numFailedVolumes;
 
-  FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
+  FsVolumeList(int failedVols,
       VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
-    this.volumes = Collections.unmodifiableList(volumes);
     this.blockChooser = blockChooser;
     this.numFailedVolumes = failedVols;
   }
@@ -101,12 +100,6 @@ class FsVolumeList {
     }
     return remaining;
   }
-    
-  void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
-    for (FsVolumeImpl v : volumes) {
-      v.getVolumeMap(globalReplicaMap);
-    }
-  }
   
   void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
     long totalStartTime = Time.monotonicNow();
@@ -205,6 +198,19 @@ class FsVolumeList {
     return volumes.toString();
   }
 
+  /**
+   * Dynamically add new volumes to the existing volumes that this DN manages.
+   * @param newVolume the instance of new FsVolumeImpl.
+   */
+  synchronized void addVolume(FsVolumeImpl newVolume) {
+    // Make a copy of volumes to add new volumes.
+    final List<FsVolumeImpl> volumeList = volumes == null ?
+        new ArrayList<FsVolumeImpl>() :
+        new ArrayList<FsVolumeImpl>(volumes);
+    volumeList.add(newVolume);
+    volumes = Collections.unmodifiableList(volumeList);
+    FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+  }
 
   void addBlockPool(final String bpid, final Configuration conf) throws IOException {
     long totalStartTime = Time.monotonicNow();

Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Aug  7 23:01:36 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
@@ -1083,6 +1084,11 @@ public class SimulatedFSDataset implemen
   }
 
   @Override
+  public void addVolumes(Collection<StorageLocation> volumes) {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
   public DatanodeStorage getStorage(final String storageUuid) {
     return storageUuid.equals(storage.getStorageUuid()) ?
         storage.dnStorage :

Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java?rev=1616624&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java Thu Aug  7 23:01:36 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+public class TestFsDatasetImpl {
+  private static final String BASE_DIR =
+      System.getProperty("test.build.dir") + "/fsdatasetimpl";
+  private static final int NUM_INIT_VOLUMES = 2;
+
+  private DataStorage storage;
+  private FsDatasetImpl dataset;
+
+  private static void createStorageDirs(DataStorage storage, Configuration conf,
+      int numDirs) throws IOException {
+    List<Storage.StorageDirectory> dirs =
+        new ArrayList<Storage.StorageDirectory>();
+    List<String> dirStrings = new ArrayList<String>();
+    for (int i = 0; i < numDirs; i++) {
+      String loc = BASE_DIR + "/data" + i;
+      dirStrings.add(loc);
+      dirs.add(new Storage.StorageDirectory(new File(loc)));
+      when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+    }
+
+    String dataDir = StringUtils.join(",", dirStrings);
+    conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
+    when(storage.getNumStorageDirs()).thenReturn(numDirs);
+  }
+
+  @Before
+  public void setUp() throws IOException {
+    final DataNode datanode = Mockito.mock(DataNode.class);
+    storage = Mockito.mock(DataStorage.class);
+    Configuration conf = new Configuration();
+    final DNConf dnConf = new DNConf(conf);
+
+    when(datanode.getConf()).thenReturn(conf);
+    when(datanode.getDnConf()).thenReturn(dnConf);
+
+    createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
+    dataset = new FsDatasetImpl(datanode, storage, conf);
+
+    assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
+    assertEquals(0, dataset.getNumFailedVolumes());
+  }
+
+  @Test
+  public void testAddVolumes() throws IOException {
+    final int numNewVolumes = 3;
+    final int numExistingVolumes = dataset.getVolumes().size();
+    final int totalVolumes = numNewVolumes + numExistingVolumes;
+    List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
+    for (int i = 0; i < numNewVolumes; i++) {
+      String path = BASE_DIR + "/newData" + i;
+      newLocations.add(StorageLocation.parse(path));
+      when(storage.getStorageDir(numExistingVolumes + i))
+          .thenReturn(new Storage.StorageDirectory(new File(path)));
+    }
+    when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
+
+    dataset.addVolumes(newLocations);
+    assertEquals(totalVolumes, dataset.getVolumes().size());
+    for (int i = 0; i < numNewVolumes; i++) {
+      assertEquals(newLocations.get(i).getFile().getPath(),
+          dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
+    }
+  }
+}