You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2014/08/08 01:01:37 UTC
svn commit: r1616624 - in
/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/
src/main/java/org/apache/hadoop/hdfs/se...
Author: atm
Date: Thu Aug 7 23:01:36 2014
New Revision: 1616624
URL: http://svn.apache.org/r1616624
Log:
HDFS-6740. Make FSDataset support adding data volumes dynamically. Contributed by Lei Xu.
Added:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
Modified:
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Aug 7 23:01:36 2014
@@ -115,6 +115,9 @@ Release 2.6.0 - UNRELEASED
HDFS-6728. Dynamically add new volumes to DataStorage, formatted if
necessary. (Lei Xu vi atm)
+ HDFS-6740. Make FSDataset support adding data volumes dynamically. (Lei
+ Xu via atm)
+
OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang)
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StorageLocation.java Thu Aug 7 23:01:36 2014
@@ -78,7 +78,7 @@ public class StorageLocation {
* @return A StorageLocation object if successfully parsed, null otherwise.
* Does not throw any exceptions.
*/
- static StorageLocation parse(String rawLocation)
+ public static StorageLocation parse(String rawLocation)
throws IOException, SecurityException {
Matcher matcher = regex.matcher(rawLocation);
StorageType storageType = StorageType.DEFAULT;
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Thu Aug 7 23:01:36 2014
@@ -22,6 +22,7 @@ import java.io.File;
import java.io.FileDescriptor;
import java.io.IOException;
import java.io.InputStream;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
import org.apache.hadoop.hdfs.server.datanode.Replica;
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
@@ -91,6 +93,10 @@ public interface FsDatasetSpi<V extends
/** @return a list of volumes. */
public List<V> getVolumes();
+ /** Add an array of StorageLocation to FsDataset. */
+ public void addVolumes(Collection<StorageLocation> volumes)
+ throws IOException;
+
/** @return a storage with the given storage ID */
public DatanodeStorage getStorage(final String storageUuid);
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetAsyncDiskService.java Thu Aug 7 23:01:36 2014
@@ -61,6 +61,7 @@ class FsDatasetAsyncDiskService {
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
private final DataNode datanode;
+ private final ThreadGroup threadGroup;
private Map<File, ThreadPoolExecutor> executors
= new HashMap<File, ThreadPoolExecutor>();
@@ -70,42 +71,52 @@ class FsDatasetAsyncDiskService {
*
* The AsyncDiskServices uses one ThreadPool per volume to do the async
* disk operations.
- *
- * @param volumes The roots of the data volumes.
*/
- FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
+ FsDatasetAsyncDiskService(DataNode datanode) {
this.datanode = datanode;
+ this.threadGroup = new ThreadGroup(getClass().getSimpleName());
+ }
+
+ private void addExecutorForVolume(final File volume) {
+ ThreadFactory threadFactory = new ThreadFactory() {
+ int counter = 0;
+
+ @Override
+ public Thread newThread(Runnable r) {
+ int thisIndex;
+ synchronized (this) {
+ thisIndex = counter++;
+ }
+ Thread t = new Thread(threadGroup, r);
+ t.setName("Async disk worker #" + thisIndex +
+ " for volume " + volume);
+ return t;
+ }
+ };
- final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
- // Create one ThreadPool per volume
- for (int v = 0 ; v < volumes.length; v++) {
- final File vol = volumes[v];
- ThreadFactory threadFactory = new ThreadFactory() {
- int counter = 0;
-
- @Override
- public Thread newThread(Runnable r) {
- int thisIndex;
- synchronized (this) {
- thisIndex = counter++;
- }
- Thread t = new Thread(threadGroup, r);
- t.setName("Async disk worker #" + thisIndex +
- " for volume " + vol);
- return t;
- }
- };
-
- ThreadPoolExecutor executor = new ThreadPoolExecutor(
- CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
- THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
- new LinkedBlockingQueue<Runnable>(), threadFactory);
-
- // This can reduce the number of running threads
- executor.allowCoreThreadTimeOut(true);
- executors.put(vol, executor);
+ ThreadPoolExecutor executor = new ThreadPoolExecutor(
+ CORE_THREADS_PER_VOLUME, MAXIMUM_THREADS_PER_VOLUME,
+ THREADS_KEEP_ALIVE_SECONDS, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(), threadFactory);
+
+ // This can reduce the number of running threads
+ executor.allowCoreThreadTimeOut(true);
+ executors.put(volume, executor);
+ }
+
+ /**
+ * Starts AsyncDiskService for a new volume
+ * @param volume the root of the new data volume.
+ */
+ synchronized void addVolume(File volume) {
+ if (executors == null) {
+ throw new RuntimeException("AsyncDiskService is already shutdown");
}
-
+ ThreadPoolExecutor executor = executors.get(volume);
+ if (executor != null) {
+ throw new RuntimeException("Volume " + volume + " is already existed.");
+ }
+ addExecutorForVolume(volume);
}
synchronized long countPendingDeletions() {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Thu Aug 7 23:01:36 2014
@@ -202,6 +202,7 @@ class FsDatasetImpl implements FsDataset
final Map<String, DatanodeStorage> storageMap;
final FsDatasetAsyncDiskService asyncDiskService;
final FsDatasetCache cacheManager;
+ private final Configuration conf;
private final int validVolsRequired;
final ReplicaMap volumeMap;
@@ -216,6 +217,7 @@ class FsDatasetImpl implements FsDataset
) throws IOException {
this.datanode = datanode;
this.dataStorage = storage;
+ this.conf = conf;
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
final int volFailuresTolerated =
@@ -242,38 +244,76 @@ class FsDatasetImpl implements FsDataset
}
storageMap = new HashMap<String, DatanodeStorage>();
- final List<FsVolumeImpl> volArray = new ArrayList<FsVolumeImpl>(
- storage.getNumStorageDirs());
- for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
- Storage.StorageDirectory sd = storage.getStorageDir(idx);
- final File dir = sd.getCurrentDir();
- final StorageType storageType = getStorageTypeFromLocations(dataLocations, sd.getRoot());
- volArray.add(new FsVolumeImpl(this, sd.getStorageUuid(), dir, conf,
- storageType));
- LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
- storageMap.put(sd.getStorageUuid(),
- new DatanodeStorage(sd.getStorageUuid(), DatanodeStorage.State.NORMAL, storageType));
- }
volumeMap = new ReplicaMap(this);
-
@SuppressWarnings("unchecked")
final VolumeChoosingPolicy<FsVolumeImpl> blockChooserImpl =
ReflectionUtils.newInstance(conf.getClass(
DFSConfigKeys.DFS_DATANODE_FSDATASET_VOLUME_CHOOSING_POLICY_KEY,
RoundRobinVolumeChoosingPolicy.class,
VolumeChoosingPolicy.class), conf);
- volumes = new FsVolumeList(volArray, volsFailed, blockChooserImpl);
- volumes.initializeReplicaMaps(volumeMap);
+ volumes = new FsVolumeList(volsFailed, blockChooserImpl);
+ asyncDiskService = new FsDatasetAsyncDiskService(datanode);
- File[] roots = new File[storage.getNumStorageDirs()];
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
- roots[idx] = storage.getStorageDir(idx).getCurrentDir();
+ addVolume(dataLocations, storage.getStorageDir(idx));
}
- asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
+
cacheManager = new FsDatasetCache(this);
registerMBean(datanode.getDatanodeUuid());
}
+ private void addVolume(Collection<StorageLocation> dataLocations,
+ Storage.StorageDirectory sd) throws IOException {
+ final File dir = sd.getCurrentDir();
+ final StorageType storageType =
+ getStorageTypeFromLocations(dataLocations, sd.getRoot());
+
+ // If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
+ // nothing needed to be rolled back to make various data structures, e.g.,
+ // storageMap and asyncDiskService, consistent.
+ FsVolumeImpl fsVolume = new FsVolumeImpl(
+ this, sd.getStorageUuid(), dir, this.conf, storageType);
+ fsVolume.getVolumeMap(volumeMap);
+
+ volumes.addVolume(fsVolume);
+ storageMap.put(sd.getStorageUuid(),
+ new DatanodeStorage(sd.getStorageUuid(),
+ DatanodeStorage.State.NORMAL,
+ storageType));
+ asyncDiskService.addVolume(sd.getCurrentDir());
+
+ LOG.info("Added volume - " + dir + ", StorageType: " + storageType);
+ }
+
+ /**
+ * Add an array of StorageLocation to FsDataset.
+ *
+ * @pre dataStorage must have these volumes.
+ * @param volumes
+ * @throws IOException
+ */
+ @Override
+ public synchronized void addVolumes(Collection<StorageLocation> volumes)
+ throws IOException {
+ final Collection<StorageLocation> dataLocations =
+ DataNode.getStorageLocations(this.conf);
+ Map<String, Storage.StorageDirectory> allStorageDirs =
+ new HashMap<String, Storage.StorageDirectory>();
+ for (int idx = 0; idx < dataStorage.getNumStorageDirs(); idx++) {
+ Storage.StorageDirectory sd = dataStorage.getStorageDir(idx);
+ allStorageDirs.put(sd.getRoot().getAbsolutePath(), sd);
+ }
+
+ for (StorageLocation vol : volumes) {
+ String key = vol.getFile().getAbsolutePath();
+ if (!allStorageDirs.containsKey(key)) {
+ LOG.warn("Attempt to add an invalid volume: " + vol.getFile());
+ } else {
+ addVolume(dataLocations, allStorageDirs.get(key));
+ }
+ }
+ }
+
private StorageType getStorageTypeFromLocations(
Collection<StorageLocation> dataLocations, File dir) {
for (StorageLocation dataLocation : dataLocations) {
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java Thu Aug 7 23:01:36 2014
@@ -40,9 +40,8 @@ class FsVolumeList {
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
private volatile int numFailedVolumes;
- FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
+ FsVolumeList(int failedVols,
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
- this.volumes = Collections.unmodifiableList(volumes);
this.blockChooser = blockChooser;
this.numFailedVolumes = failedVols;
}
@@ -101,12 +100,6 @@ class FsVolumeList {
}
return remaining;
}
-
- void initializeReplicaMaps(ReplicaMap globalReplicaMap) throws IOException {
- for (FsVolumeImpl v : volumes) {
- v.getVolumeMap(globalReplicaMap);
- }
- }
void getAllVolumesMap(final String bpid, final ReplicaMap volumeMap) throws IOException {
long totalStartTime = Time.monotonicNow();
@@ -205,6 +198,19 @@ class FsVolumeList {
return volumes.toString();
}
+ /**
+ * Dynamically add new volumes to the existing volumes that this DN manages.
+ * @param newVolume the instance of new FsVolumeImpl.
+ */
+ synchronized void addVolume(FsVolumeImpl newVolume) {
+ // Make a copy of volumes to add new volumes.
+ final List<FsVolumeImpl> volumeList = volumes == null ?
+ new ArrayList<FsVolumeImpl>() :
+ new ArrayList<FsVolumeImpl>(volumes);
+ volumeList.add(newVolume);
+ volumes = Collections.unmodifiableList(volumeList);
+ FsDatasetImpl.LOG.info("Added new volume: " + newVolume.toString());
+ }
void addBlockPool(final String bpid, final Configuration conf) throws IOException {
long totalStartTime = Time.monotonicNow();
Modified: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=1616624&r1=1616623&r2=1616624&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Thu Aug 7 23:01:36 2014
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
@@ -1083,6 +1084,11 @@ public class SimulatedFSDataset implemen
}
@Override
+ public void addVolumes(Collection<StorageLocation> volumes) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
public DatanodeStorage getStorage(final String storageUuid) {
return storageUuid.equals(storage.getStorageUuid()) ?
storage.dnStorage :
Added: hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java?rev=1616624&view=auto
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java (added)
+++ hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/TestFsDatasetImpl.java Thu Aug 7 23:01:36 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.datanode.DNConf;
+import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.hdfs.server.datanode.DataStorage;
+import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
+import org.apache.hadoop.util.StringUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
+
+public class TestFsDatasetImpl {
+ private static final String BASE_DIR =
+ System.getProperty("test.build.dir") + "/fsdatasetimpl";
+ private static final int NUM_INIT_VOLUMES = 2;
+
+ private DataStorage storage;
+ private FsDatasetImpl dataset;
+
+ private static void createStorageDirs(DataStorage storage, Configuration conf,
+ int numDirs) throws IOException {
+ List<Storage.StorageDirectory> dirs =
+ new ArrayList<Storage.StorageDirectory>();
+ List<String> dirStrings = new ArrayList<String>();
+ for (int i = 0; i < numDirs; i++) {
+ String loc = BASE_DIR + "/data" + i;
+ dirStrings.add(loc);
+ dirs.add(new Storage.StorageDirectory(new File(loc)));
+ when(storage.getStorageDir(i)).thenReturn(dirs.get(i));
+ }
+
+ String dataDir = StringUtils.join(",", dirStrings);
+ conf.set(DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY, dataDir);
+ when(storage.getNumStorageDirs()).thenReturn(numDirs);
+ }
+
+ @Before
+ public void setUp() throws IOException {
+ final DataNode datanode = Mockito.mock(DataNode.class);
+ storage = Mockito.mock(DataStorage.class);
+ Configuration conf = new Configuration();
+ final DNConf dnConf = new DNConf(conf);
+
+ when(datanode.getConf()).thenReturn(conf);
+ when(datanode.getDnConf()).thenReturn(dnConf);
+
+ createStorageDirs(storage, conf, NUM_INIT_VOLUMES);
+ dataset = new FsDatasetImpl(datanode, storage, conf);
+
+ assertEquals(NUM_INIT_VOLUMES, dataset.getVolumes().size());
+ assertEquals(0, dataset.getNumFailedVolumes());
+ }
+
+ @Test
+ public void testAddVolumes() throws IOException {
+ final int numNewVolumes = 3;
+ final int numExistingVolumes = dataset.getVolumes().size();
+ final int totalVolumes = numNewVolumes + numExistingVolumes;
+ List<StorageLocation> newLocations = new ArrayList<StorageLocation>();
+ for (int i = 0; i < numNewVolumes; i++) {
+ String path = BASE_DIR + "/newData" + i;
+ newLocations.add(StorageLocation.parse(path));
+ when(storage.getStorageDir(numExistingVolumes + i))
+ .thenReturn(new Storage.StorageDirectory(new File(path)));
+ }
+ when(storage.getNumStorageDirs()).thenReturn(totalVolumes);
+
+ dataset.addVolumes(newLocations);
+ assertEquals(totalVolumes, dataset.getVolumes().size());
+ for (int i = 0; i < numNewVolumes; i++) {
+ assertEquals(newLocations.get(i).getFile().getPath(),
+ dataset.getVolumes().get(numExistingVolumes + i).getBasePath());
+ }
+ }
+}