You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:09 UTC
[41/50] [abbrv] hadoop git commit: HDFS-10800: [SPS]: Daemon thread
in Namenode to find blocks placed in other storage than what the policy
specifies. Contributed by Uma Maheswara Rao G
HDFS-10800: [SPS]: Daemon thread in Namenode to find blocks placed in other storage than what the policy specifies. Contributed by Uma Maheswara Rao G
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/3bb1c040
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/3bb1c040
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/3bb1c040
Branch: refs/heads/HDFS-10285
Commit: 3bb1c04093fc09ee4d8163bebbdae78804159d0d
Parents: 9ec39c6
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Fri Sep 23 13:41:29 2016 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:24:12 2016 +0530
----------------------------------------------------------------------
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 41 ++
.../server/blockmanagement/BlockManager.java | 20 +
.../blockmanagement/DatanodeDescriptor.java | 38 ++
.../server/blockmanagement/DatanodeManager.java | 7 +
.../datanode/StoragePolicySatisfyWorker.java | 29 +-
.../namenode/BlockStorageMovementNeeded.java | 53 +++
.../server/namenode/StoragePolicySatisfier.java | 397 +++++++++++++++++++
.../protocol/BlockStorageMovementCommand.java | 11 +-
.../TestStoragePolicySatisfyWorker.java | 24 +-
.../namenode/TestStoragePolicySatisfier.java | 209 ++++++++++
10 files changed, 797 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 23166e2..dc6f0d1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -48,6 +48,7 @@ import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -68,6 +69,7 @@ import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderCryptoExtension;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -1583,4 +1585,43 @@ public class DFSUtil {
.createKeyProviderCryptoExtension(keyProvider);
return cryptoProvider;
}
+
+ /**
+ * Remove the overlap between the expected types and the existing types.
+ *
+ * @param expected
+ * - Expected storage types list.
+ * @param existing
+ * - Existing storage types list.
+ * @param ignoreNonMovable
+ * ignore non-movable storage types by removing them from both
+ * expected and existing storage type list to prevent non-movable
+ * storage from being moved.
+ * @returns if the existing types or the expected types is empty after
+ * removing the overlap.
+ */
+ public static boolean removeOverlapBetweenStorageTypes(
+ List<StorageType> expected,
+ List<StorageType> existing, boolean ignoreNonMovable) {
+ for (Iterator<StorageType> i = existing.iterator(); i.hasNext();) {
+ final StorageType t = i.next();
+ if (expected.remove(t)) {
+ i.remove();
+ }
+ }
+ if (ignoreNonMovable) {
+ removeNonMovable(existing);
+ removeNonMovable(expected);
+ }
+ return expected.isEmpty() || existing.isEmpty();
+ }
+
+ private static void removeNonMovable(List<StorageType> types) {
+ for (Iterator<StorageType> i = types.iterator(); i.hasNext();) {
+ final StorageType t = i.next();
+ if (!t.isMovable()) {
+ i.remove();
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index e60703b..4a920e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -85,6 +85,8 @@ import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.BlockStorageMovementNeeded;
+import org.apache.hadoop.hdfs.server.namenode.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
@@ -366,6 +368,11 @@ public class BlockManager implements BlockStatsMXBean {
private final BlockIdManager blockIdManager;
+ /** For satisfying block storage policies */
+ private final StoragePolicySatisfier sps;
+ private final BlockStorageMovementNeeded storageMovementNeeded =
+ new BlockStorageMovementNeeded();
+
/** Minimum live replicas needed for the datanode to be transitioned
* from ENTERING_MAINTENANCE to IN_MAINTENANCE.
*/
@@ -401,6 +408,7 @@ public class BlockManager implements BlockStatsMXBean {
DFSConfigKeys.DFS_NAMENODE_RECONSTRUCTION_PENDING_TIMEOUT_SEC_DEFAULT)
* 1000L);
+ sps = new StoragePolicySatisfier(namesystem, storageMovementNeeded, this);
blockTokenSecretManager = createBlockTokenSecretManager(conf);
this.maxCorruptFilesReturned = conf.getInt(
@@ -617,9 +625,11 @@ public class BlockManager implements BlockStatsMXBean {
this.blockReportThread.start();
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
bmSafeMode.activate(blockTotal);
+ sps.start();
}
public void close() {
+ sps.stop();
bmSafeMode.close();
try {
redundancyThread.interrupt();
@@ -4707,4 +4717,14 @@ public class BlockManager implements BlockStatsMXBean {
}
return i;
}
+
+ /**
+ * Set file block collection for which storage movement needed for its blocks.
+ *
+ * @param id
+ * - file block collection id.
+ */
+ public void satisfyStoragePolicy(long id) {
+ storageMovementNeeded.add(id);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
index 320c680..294e2c0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@@ -203,6 +205,10 @@ public class DatanodeDescriptor extends DatanodeInfo {
private final LightWeightHashSet<Block> invalidateBlocks =
new LightWeightHashSet<>();
+ /** A queue of blocks for moving its storage placements by this datanode. */
+ private final Queue<List<BlockMovingInfo>> storageMovementBlocks =
+ new LinkedList<>();
+
/* Variables for maintaining number of blocks scheduled to be written to
* this storage. This count is approximate and might be slightly bigger
* in case of errors (e.g. datanode does not report if an error occurs
@@ -928,5 +934,37 @@ public class DatanodeDescriptor extends DatanodeInfo {
public boolean isRegistered() {
return isAlive() && !forceRegistration;
}
+
+ /**
+ * Add the block infos which needs to move its storage locations.
+ *
+ * @param storageMismatchedBlocks
+ * - storage mismatched block infos
+ */
+ public void addBlocksToMoveStorage(
+ List<BlockMovingInfo> storageMismatchedBlocks) {
+ storageMovementBlocks.offer(storageMismatchedBlocks);
+ }
+
+ /**
+ * @return block infos which needs to move its storage locations.
+ */
+ public List<BlockMovingInfo> getBlocksToMoveStorages() {
+ return storageMovementBlocks.poll();
+ }
+
+ // TODO: we will remove this method once DN side handling integrated. We can
+ // convert the test to check real block movements instead of this ds.
+ @VisibleForTesting
+ public List<BlockMovingInfo> getStorageMovementPendingItems() {
+ List<BlockMovingInfo> flatList = new ArrayList<>();
+ Iterator<List<BlockMovingInfo>> iterator = storageMovementBlocks
+ .iterator();
+ while (iterator.hasNext()) {
+ List<BlockMovingInfo> next = iterator.next();
+ flatList.addAll(next);
+ }
+ return flatList;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
index cc64a04..49b78e1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockECReconstructionCommand.BlockECReconstructionInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringStripedBlock;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
@@ -1632,6 +1633,12 @@ public class DatanodeManager {
nodeinfo.setBalancerBandwidth(0);
}
+ List<BlockMovingInfo> blocksToMoveStorages = nodeinfo
+ .getBlocksToMoveStorages();
+ if (blocksToMoveStorages != null) {
+ // TODO: create BlockStorageMovementCommand and add here in response.
+ }
+
if (!cmds.isEmpty()) {
return cmds.toArray(new DatanodeCommand[cmds.size()]);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 6df4e81..fa408f6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
@@ -125,7 +126,7 @@ public class StoragePolicySatisfyWorker {
return moverThreadPool;
}
- public void processBlockMovingTasks(long trackID,
+ public void processBlockMovingTasks(long trackID, String blockPoolID,
List<BlockMovingInfo> blockMovingInfos) {
Future<Void> moveCallable = null;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
@@ -133,13 +134,11 @@ public class StoragePolicySatisfyWorker {
.getSources().length == blkMovingInfo.getTargets().length;
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
- BlockMovingTask blockMovingTask =
- new BlockMovingTask(blkMovingInfo.getBlock(),
- blkMovingInfo.getSources()[i],
- blkMovingInfo.getTargets()[i],
+ BlockMovingTask blockMovingTask = new BlockMovingTask(
+ blkMovingInfo.getBlock(), blockPoolID,
+ blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
blkMovingInfo.getTargetStorageTypes()[i]);
- moveCallable = moverExecutorCompletionService
- .submit(blockMovingTask);
+ moveCallable = moverExecutorCompletionService.submit(blockMovingTask);
moverTaskFutures.add(moveCallable);
}
}
@@ -163,14 +162,16 @@ public class StoragePolicySatisfyWorker {
* given target.
*/
private class BlockMovingTask implements Callable<Void> {
- private final ExtendedBlock block;
+ private final Block block;
private final DatanodeInfo source;
private final DatanodeInfo target;
private final StorageType targetStorageType;
+ private String blockPoolID;
- BlockMovingTask(ExtendedBlock block, DatanodeInfo source,
+ BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source,
DatanodeInfo target, StorageType targetStorageType) {
this.block = block;
+ this.blockPoolID = blockPoolID;
this.source = source;
this.target = target;
this.targetStorageType = targetStorageType;
@@ -201,12 +202,12 @@ public class StoragePolicySatisfyWorker {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
-
+ ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
- block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
+ extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
DataEncryptionKeyFactory keyFactory = datanode
- .getDataEncryptionKeyFactoryForBlock(block);
+ .getDataEncryptionKeyFactoryForBlock(extendedBlock);
IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
unbufOut, unbufIn, keyFactory, accessToken, target);
unbufOut = saslStreams.out;
@@ -215,10 +216,10 @@ public class StoragePolicySatisfyWorker {
new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream(
new BufferedInputStream(unbufIn, ioFileBufferSize));
- sendRequest(out, block, accessToken, source, targetStorageType);
+ sendRequest(out, extendedBlock, accessToken, source, targetStorageType);
receiveResponse(in);
- LOG.debug(
+ LOG.info(
"Successfully moved block:{} from src:{} to destin:{} for"
+ " satisfying storageType:{}",
block, source, target, targetStorageType);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
new file mode 100644
index 0000000..c916672
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementNeeded.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * A Class to track the block collection IDs for which physical storage movement
+ * needed as per the Namespace and StorageReports from DN.
+ */
+@InterfaceAudience.Private
+public class BlockStorageMovementNeeded {
+ private final Queue<Long> storageMovementNeeded = new LinkedList<Long>();
+
+ /**
+ * Add the block collection id to tracking list for which storage movement
+ * expected if necessary.
+ *
+ * @param blockCollectionID
+ * - block collection id, which is nothing but inode id.
+ */
+ public synchronized void add(Long blockCollectionID) {
+ storageMovementNeeded.add(blockCollectionID);
+ }
+
+ /**
+ * Gets the block collection id for which storage movements check necessary
+ * and make the movement if required.
+ *
+ * @return block collection ID
+ */
+ public synchronized Long get() {
+ return storageMovementNeeded.poll();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
new file mode 100644
index 0000000..b5aed37
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
+import org.apache.hadoop.hdfs.server.balancer.Matcher;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockCollection;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import org.apache.hadoop.util.Daemon;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Setting storagePolicy on a file after the file write will only update the new
+ * storage policy type in Namespace, but physical block storage movement will
+ * not happen until user runs "Mover Tool" explicitly for such files. The
+ * StoragePolicySatisfier Daemon thread implemented for addressing the case
+ * where users may want to physically move the blocks by HDFS itself instead of
+ * running mover tool explicitly. Just calling client API to
+ * satisfyStoragePolicy on a file/dir will automatically trigger to move its
+ * physical storage locations as expected in asynchronous manner. Here Namenode
+ * will pick the file blocks which are expecting to change its storages, then it
+ * will build the mapping of source block location and expected storage type and
+ * location to move. After that this class will also prepare commands to send to
+ * Datanode for processing the physical block movements.
+ */
+@InterfaceAudience.Private
+public class StoragePolicySatisfier implements Runnable {
+ public static final Logger LOG =
+ LoggerFactory.getLogger(StoragePolicySatisfier.class);
+ private Daemon storagePolicySatisfierThread;
+ private final Namesystem namesystem;
+ private final BlockManager blockManager;
+ private final BlockStorageMovementNeeded storageMovementNeeded;
+
+ public StoragePolicySatisfier(final Namesystem namesystem,
+ final BlockStorageMovementNeeded storageMovementNeeded,
+ final BlockManager blkManager) {
+ this.namesystem = namesystem;
+ this.storageMovementNeeded = storageMovementNeeded;
+ this.blockManager = blkManager;
+ }
+
+ /**
+ * Start storage policy satisfier demon thread.
+ */
+ public void start() {
+ storagePolicySatisfierThread = new Daemon(this);
+ storagePolicySatisfierThread.setName("StoragePolicySatisfier");
+ storagePolicySatisfierThread.start();
+ }
+
+ /**
+ * Stop storage policy satisfier demon thread.
+ */
+ public void stop() {
+ if (storagePolicySatisfierThread == null) {
+ return;
+ }
+ storagePolicySatisfierThread.interrupt();
+ try {
+ storagePolicySatisfierThread.join(3000);
+ } catch (InterruptedException ie) {
+ }
+ }
+
+ @Override
+ public void run() {
+ while (namesystem.isRunning()) {
+ try {
+ Long blockCollectionID = storageMovementNeeded.get();
+ if (blockCollectionID != null) {
+ computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
+ }
+ // TODO: We can think to make this as configurable later, how frequently
+ // we want to check block movements.
+ Thread.sleep(3000);
+ } catch (Throwable t) {
+ if (!namesystem.isRunning()) {
+ LOG.info("Stopping StoragePolicySatisfier.");
+ if (!(t instanceof InterruptedException)) {
+ LOG.info("StoragePolicySatisfier received an exception"
+ + " while shutting down.", t);
+ }
+ break;
+ }
+ LOG.error("StoragePolicySatisfier thread received runtime exception. "
+ + "Stopping Storage policy satisfier work", t);
+ // TODO: Just break for now. Once we implement dynamic start/stop
+ // option, we can add conditions here when to break/terminate.
+ break;
+ }
+ }
+ }
+
+ private void computeAndAssignStorageMismatchedBlocksToDNs(
+ long blockCollectionID) {
+ BlockCollection blockCollection =
+ namesystem.getBlockCollection(blockCollectionID);
+ if (blockCollection == null) {
+ return;
+ }
+ byte existingStoragePolicyID = blockCollection.getStoragePolicyID();
+ BlockStoragePolicy existingStoragePolicy =
+ blockManager.getStoragePolicy(existingStoragePolicyID);
+ if (!blockCollection.getLastBlock().isComplete()) {
+ // Postpone, currently file is under construction
+ // So, should we add back? or leave it to user
+ return;
+ }
+
+ // First datanode will be chosen as the co-ordinator node for storage
+ // movements. Later this can be optimized if needed.
+ DatanodeDescriptor coordinatorNode = null;
+ BlockInfo[] blocks = blockCollection.getBlocks();
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<BlockMovingInfo>();
+ for (int i = 0; i < blocks.length; i++) {
+ BlockInfo blockInfo = blocks[i];
+ List<StorageType> expectedStorageTypes =
+ existingStoragePolicy.chooseStorageTypes(blockInfo.getReplication());
+ DatanodeStorageInfo[] storages = blockManager.getStorages(blockInfo);
+ StorageType[] storageTypes = new StorageType[storages.length];
+ for (int j = 0; j < storages.length; j++) {
+ DatanodeStorageInfo datanodeStorageInfo = storages[j];
+ StorageType storageType = datanodeStorageInfo.getStorageType();
+ storageTypes[j] = storageType;
+ }
+ List<StorageType> existing =
+ new LinkedList<StorageType>(Arrays.asList(storageTypes));
+ if (!DFSUtil.removeOverlapBetweenStorageTypes(expectedStorageTypes,
+ existing, true)) {
+ List<StorageTypeNodePair> sourceWithStorageMap =
+ new ArrayList<StorageTypeNodePair>();
+ List<DatanodeStorageInfo> existingBlockStorages =
+ new ArrayList<DatanodeStorageInfo>(Arrays.asList(storages));
+ for (StorageType existingType : existing) {
+ Iterator<DatanodeStorageInfo> iterator =
+ existingBlockStorages.iterator();
+ while (iterator.hasNext()) {
+ DatanodeStorageInfo datanodeStorageInfo = iterator.next();
+ StorageType storageType = datanodeStorageInfo.getStorageType();
+ if (storageType == existingType) {
+ iterator.remove();
+ sourceWithStorageMap.add(new StorageTypeNodePair(storageType,
+ datanodeStorageInfo.getDatanodeDescriptor()));
+ break;
+ }
+ }
+ }
+
+ StorageTypeNodeMap locsForExpectedStorageTypes =
+ findTargetsForExpectedStorageTypes(expectedStorageTypes);
+
+ BlockMovingInfo blockMovingInfo =
+ findSourceAndTargetToMove(blockInfo, existing, sourceWithStorageMap,
+ expectedStorageTypes, locsForExpectedStorageTypes);
+ if (coordinatorNode == null) {
+ // For now, first datanode will be chosen as the co-ordinator. Later
+ // this can be optimized if needed.
+ coordinatorNode =
+ (DatanodeDescriptor) blockMovingInfo.getSources()[0];
+ }
+ blockMovingInfos.add(blockMovingInfo);
+ }
+ }
+
+ if (blockMovingInfos.size() < 1) {
+ // TODO: Major: handle this case. I think we need retry cases to
+ // be implemented. Idea is, if some files are not getting storage movement
+ // chances, then we can just retry limited number of times and exit.
+ return;
+ }
+ coordinatorNode.addBlocksToMoveStorage(blockMovingInfos);
+ }
+
+ /**
+ * Find the good target node for each source node for which block storages was
+ * misplaced.
+ *
+ * @param blockInfo
+ * - Block
+ * @param existing
+ * - Existing storage types of block
+ * @param sourceWithStorageList
+ * - Source Datanode with storages list
+ * @param expected
+ * - Expecting storages to move
+ * @param locsForExpectedStorageTypes
+ * - Available DNs for expected storage types
+ * @return list of block source and target node pair
+ */
+ private BlockMovingInfo findSourceAndTargetToMove(BlockInfo blockInfo,
+ List<StorageType> existing,
+ List<StorageTypeNodePair> sourceWithStorageList,
+ List<StorageType> expected,
+ StorageTypeNodeMap locsForExpectedStorageTypes) {
+ List<DatanodeInfo> sourceNodes = new ArrayList<>();
+ List<StorageType> sourceStorageTypes = new ArrayList<>();
+ List<DatanodeInfo> targetNodes = new ArrayList<>();
+ List<StorageType> targetStorageTypes = new ArrayList<>();
+ List<DatanodeDescriptor> chosenNodes = new ArrayList<>();
+ for (int i = 0; i < sourceWithStorageList.size(); i++) {
+ StorageTypeNodePair existingTypeNodePair = sourceWithStorageList.get(i);
+ StorageTypeNodePair chosenTarget =
+ chooseTargetTypeInSameNode(existingTypeNodePair.dn, expected,
+ locsForExpectedStorageTypes, chosenNodes);
+
+ if (chosenTarget == null && blockManager.getDatanodeManager()
+ .getNetworkTopology().isNodeGroupAware()) {
+ chosenTarget = chooseTarget(blockInfo, existingTypeNodePair.dn,
+ expected, Matcher.SAME_NODE_GROUP, locsForExpectedStorageTypes,
+ chosenNodes);
+ }
+
+ // Then, match nodes on the same rack
+ if (chosenTarget == null) {
+ chosenTarget =
+ chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
+ Matcher.SAME_RACK, locsForExpectedStorageTypes, chosenNodes);
+ }
+
+ if (chosenTarget == null) {
+ chosenTarget =
+ chooseTarget(blockInfo, existingTypeNodePair.dn, expected,
+ Matcher.ANY_OTHER, locsForExpectedStorageTypes, chosenNodes);
+ }
+ if (null != chosenTarget) {
+ sourceNodes.add(existingTypeNodePair.dn);
+ sourceStorageTypes.add(existingTypeNodePair.storageType);
+ targetNodes.add(chosenTarget.dn);
+ targetStorageTypes.add(chosenTarget.storageType);
+ chosenNodes.add(chosenTarget.dn);
+ // TODO: We can increment scheduled block count for this node?
+ } else {
+ // TODO: Failed to ChooseTargetNodes...So let just retry. Shall we
+ // proceed without this targets? Then what should be final result?
+ // How about pack empty target, means target node could not be chosen ,
+ // so result should be RETRY_REQUIRED from DN always.
+ // Log..unable to choose target node for source datanodeDescriptor
+ sourceNodes.add(existingTypeNodePair.dn);
+ sourceStorageTypes.add(existingTypeNodePair.storageType);
+ targetNodes.add(null);
+ targetStorageTypes.add(null);
+ }
+ }
+ BlockMovingInfo blkMovingInfo = new BlockMovingInfo(blockInfo,
+ sourceNodes.toArray(new DatanodeInfo[sourceNodes.size()]),
+ targetNodes.toArray(new DatanodeInfo[targetNodes.size()]),
+ sourceStorageTypes.toArray(new StorageType[sourceStorageTypes.size()]),
+ targetStorageTypes.toArray(new StorageType[targetStorageTypes.size()]));
+ return blkMovingInfo;
+ }
+
+ /**
+ * Choose the target storage within same Datanode if possible.
+ *
+ * @param locsForExpectedStorageTypes
+ * @param chosenNodes
+ */
+ private StorageTypeNodePair chooseTargetTypeInSameNode(
+ DatanodeDescriptor source, List<StorageType> targetTypes,
+ StorageTypeNodeMap locsForExpectedStorageTypes,
+ List<DatanodeDescriptor> chosenNodes) {
+ for (StorageType t : targetTypes) {
+ DatanodeStorageInfo chooseStorage4Block =
+ source.chooseStorage4Block(t, 0);
+ if (chooseStorage4Block != null) {
+ return new StorageTypeNodePair(t, source);
+ }
+ }
+ return null;
+ }
+
+ private StorageTypeNodePair chooseTarget(Block block,
+ DatanodeDescriptor source, List<StorageType> targetTypes, Matcher matcher,
+ StorageTypeNodeMap locsForExpectedStorageTypes,
+ List<DatanodeDescriptor> chosenNodes) {
+ for (StorageType t : targetTypes) {
+ List<DatanodeDescriptor> nodesWithStorages =
+ locsForExpectedStorageTypes.getNodesWithStorages(t);
+ Collections.shuffle(nodesWithStorages);
+ for (DatanodeDescriptor target : nodesWithStorages) {
+ if (!chosenNodes.contains(target) && matcher.match(
+ blockManager.getDatanodeManager().getNetworkTopology(), source,
+ target)) {
+ if (null != target.chooseStorage4Block(t, block.getNumBytes())) {
+ return new StorageTypeNodePair(t, target);
+ }
+ }
+ }
+ }
+ return null;
+ }
+
+ private static class StorageTypeNodePair {
+ public StorageType storageType = null;
+ public DatanodeDescriptor dn = null;
+
+ public StorageTypeNodePair(StorageType storageType, DatanodeDescriptor dn) {
+ this.storageType = storageType;
+ this.dn = dn;
+ }
+ }
+
+ private StorageTypeNodeMap findTargetsForExpectedStorageTypes(
+ List<StorageType> expected) {
+ StorageTypeNodeMap targetMap = new StorageTypeNodeMap();
+ List<DatanodeDescriptor> reports = blockManager.getDatanodeManager()
+ .getDatanodeListForReport(DatanodeReportType.LIVE);
+ for (DatanodeDescriptor dn : reports) {
+ StorageReport[] storageReports = dn.getStorageReports();
+ for (StorageReport storageReport : storageReports) {
+ StorageType t = storageReport.getStorage().getStorageType();
+ if (expected.contains(t)) {
+ final long maxRemaining = getMaxRemaining(dn.getStorageReports(), t);
+ if (maxRemaining > 0L) {
+ targetMap.add(t, dn);
+ }
+ }
+ }
+ }
+ return targetMap;
+ }
+
+ private static long getMaxRemaining(StorageReport[] storageReports,
+ StorageType t) {
+ long max = 0L;
+ for (StorageReport r : storageReports) {
+ if (r.getStorage().getStorageType() == t) {
+ if (r.getRemaining() > max) {
+ max = r.getRemaining();
+ }
+ }
+ }
+ return max;
+ }
+
+ private static class StorageTypeNodeMap {
+ private final EnumMap<StorageType, List<DatanodeDescriptor>> typeNodeMap =
+ new EnumMap<StorageType, List<DatanodeDescriptor>>(StorageType.class);
+
+ private void add(StorageType t, DatanodeDescriptor dn) {
+ List<DatanodeDescriptor> nodesWithStorages = getNodesWithStorages(t);
+ LinkedList<DatanodeDescriptor> value = null;
+ if (nodesWithStorages == null) {
+ value = new LinkedList<DatanodeDescriptor>();
+ value.add(dn);
+ typeNodeMap.put(t, value);
+ } else {
+ nodesWithStorages.add(dn);
+ }
+ }
+
+ /**
+ * @param type
+ * - Storage type
+ * @return datanodes which has the given storage type
+ */
+ private List<DatanodeDescriptor> getNodesWithStorages(StorageType type) {
+ return typeNodeMap.get(type);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index 42ba265..c1ab800 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -20,8 +20,8 @@ package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
/**
* A BlockStorageMovementCommand is an instruction to a DataNode to move the
@@ -35,7 +35,6 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
* NameNode about the movement status.
*/
public class BlockStorageMovementCommand extends DatanodeCommand {
-
// TODO: constructor needs to be refined based on the block movement data
// structure.
BlockStorageMovementCommand(int action) {
@@ -46,13 +45,13 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
* Stores block to storage info that can be used for block movement.
*/
public static class BlockMovingInfo {
- private ExtendedBlock blk;
+ private Block blk;
private DatanodeInfo[] sourceNodes;
private StorageType[] sourceStorageTypes;
private DatanodeInfo[] targetNodes;
private StorageType[] targetStorageTypes;
- public BlockMovingInfo(ExtendedBlock block,
+ public BlockMovingInfo(Block block,
DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
this.blk = block;
@@ -62,11 +61,11 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
this.targetStorageTypes = targetStorageTypes;
}
- public void addBlock(ExtendedBlock block) {
+ public void addBlock(Block block) {
this.blk = block;
}
- public ExtendedBlock getBlock() {
+ public Block getBlock() {
return this.blk;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index c722306..d803f1a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -30,8 +30,8 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
@@ -71,14 +71,14 @@ public class TestStoragePolicySatisfyWorker {
public void testMoveSingleBlockToAnotherDatanode() throws Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(4)
- .storageTypes(
- new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE}})
- .build();
+ final MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(4)
+ .storageTypes(
+ new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+ .build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
@@ -108,12 +108,12 @@ public class TestStoragePolicySatisfyWorker {
src);
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
- lb.getBlock(), lb.getLocations()[0], targetDnInfo,
+ lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
lb.getStorageTypes()[0], StorageType.ARCHIVE);
blockMovingInfos.add(blockMovingInfo);
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
worker.processBlockMovingTasks(inode.getId(),
- blockMovingInfos);
+ cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
cluster.triggerHeartbeats();
// Wait till NameNode notified about the block location details
@@ -150,7 +150,7 @@ public class TestStoragePolicySatisfyWorker {
}, 100, timeout);
}
- BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block,
+ BlockMovingInfo prepareBlockMovingInfo(Block block,
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
StorageType targetStorageType) {
return new BlockMovingInfo(block, new DatanodeInfo[] {src},
http://git-wip-us.apache.org/repos/asf/hadoop/blob/3bb1c040/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
new file mode 100644
index 0000000..b61814d
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfier.java
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.Before;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Supplier;
+
+/**
+ * Tests that StoragePolicySatisfier daemon is able to check the blocks to be
+ * moved and finding its suggested target locations to move.
+ */
+public class TestStoragePolicySatisfier {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
+ private final Configuration config = new HdfsConfiguration();
+ private StorageType[][] allDiskTypes =
+ new StorageType[][]{{StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK},
+ {StorageType.DISK, StorageType.DISK}};
+ private MiniDFSCluster hdfsCluster = null;
+ final private int numOfDatanodes = 3;
+ final private int storagesPerDatanode = 2;
+ final private long capacity = 2 * 256 * 1024 * 1024;
+ final private String file = "/testMoveWhenStoragePolicyNotSatisfying";
+ private DistributedFileSystem distributedFS = null;
+
+ @Before
+ public void setUp() throws IOException {
+ config.setLong("dfs.block.size", 1024);
+ hdfsCluster = startCluster(config, allDiskTypes, numOfDatanodes,
+ storagesPerDatanode, capacity);
+ distributedFS = hdfsCluster.getFileSystem();
+ writeContent(distributedFS, file);
+ }
+
+ @Test(timeout = 300000)
+ public void testWhenStoragePolicySetToCOLD()
+ throws Exception {
+
+ try {
+ // Change policy to ALL_SSD
+ distributedFS.setStoragePolicy(new Path(file), "COLD");
+ Set<DatanodeDescriptor> previousNodes =
+ hdfsCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getDatanodes();
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}};
+ startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+
+ hdfsCluster.triggerHeartbeats();
+ // Wait till namenode notified about the block location details
+ waitExpectedStorageType(StorageType.ARCHIVE, distributedFS, previousNodes,
+ 6, 30000);
+ } finally {
+ hdfsCluster.shutdown();
+ }
+ }
+
+ @Test(timeout = 300000)
+ public void testWhenStoragePolicySetToALLSSD()
+ throws Exception {
+ try {
+ // Change policy to ALL_SSD
+ distributedFS.setStoragePolicy(new Path(file), "ALL_SSD");
+ Set<DatanodeDescriptor> previousNodes =
+ hdfsCluster.getNameNode().getNamesystem().getBlockManager()
+ .getDatanodeManager().getDatanodes();
+ FSNamesystem namesystem = hdfsCluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+
+ StorageType[][] newtypes =
+ new StorageType[][]{{StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK},
+ {StorageType.SSD, StorageType.DISK}};
+
+ // Making sure SDD based nodes added to cluster. Adding SSD based
+ // datanodes.
+ startAdditionalDNs(config, 3, numOfDatanodes, newtypes,
+ storagesPerDatanode, capacity, hdfsCluster);
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+ hdfsCluster.triggerHeartbeats();
+ // Wait till StorgePolicySatisfier Identified that block to move to SSD
+ // areas
+ waitExpectedStorageType(StorageType.SSD, distributedFS, previousNodes, 6,
+ 30000);
+ } finally {
+ hdfsCluster.shutdown();
+ }
+ }
+
+ private void writeContent(final DistributedFileSystem dfs,
+ final String fileName) throws IOException {
+ // write to DISK
+ final FSDataOutputStream out = dfs.create(new Path(fileName));
+ for (int i = 0; i < 1000; i++) {
+ out.writeChars("t");
+ }
+ out.close();
+ }
+
+ private void startAdditionalDNs(final Configuration conf,
+ int newNodesRequired, int existingNodesNum, StorageType[][] newTypes,
+ int storagesPerDatanode, long capacity, final MiniDFSCluster cluster)
+ throws IOException {
+ long[][] capacities;
+ existingNodesNum += newNodesRequired;
+ capacities = new long[newNodesRequired][storagesPerDatanode];
+ for (int i = 0; i < newNodesRequired; i++) {
+ for (int j = 0; j < storagesPerDatanode; j++) {
+ capacities[i][j] = capacity;
+ }
+ }
+
+ cluster.startDataNodes(conf, newNodesRequired, newTypes, true, null, null,
+ null, capacities, null, false, false, false, null);
+ cluster.triggerHeartbeats();
+ }
+
+ private MiniDFSCluster startCluster(final Configuration conf,
+ StorageType[][] storageTypes, int numberOfDatanodes, int storagesPerDn,
+ long nodeCapacity) throws IOException {
+ long[][] capacities = new long[numberOfDatanodes][storagesPerDn];
+ for (int i = 0; i < numberOfDatanodes; i++) {
+ for (int j = 0; j < storagesPerDn; j++) {
+ capacities[i][j] = nodeCapacity;
+ }
+ }
+ final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(numberOfDatanodes).storagesPerDatanode(storagesPerDn)
+ .storageTypes(storageTypes).storageCapacities(capacities).build();
+ cluster.waitActive();
+ return cluster;
+ }
+
+ // TODO: this assertion can be changed to end to end based assertion later
+ // when DN side processing work integrated to this work.
+ private void waitExpectedStorageType(final StorageType expectedStorageType,
+ final DistributedFileSystem dfs,
+ final Set<DatanodeDescriptor> previousNodes, int expectedArchiveCount,
+ int timeout) throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ Iterator<DatanodeDescriptor> iterator = previousNodes.iterator();
+ int archiveCount = 0;
+ while (iterator.hasNext()) {
+ DatanodeDescriptor dn = iterator.next();
+ List<BlockMovingInfo> pendingItemsToMove =
+ dn.getStorageMovementPendingItems();
+ for (BlockMovingInfo blkInfoToMoveStorage : pendingItemsToMove) {
+ StorageType[] targetStorageTypes =
+ blkInfoToMoveStorage.getTargetStorageTypes();
+ for (StorageType storageType : targetStorageTypes) {
+ if (storageType == expectedStorageType) {
+ archiveCount++;
+ }
+ }
+ }
+ }
+ LOG.info(
+ expectedStorageType + " replica count, expected={} and actual={}",
+ expectedArchiveCount, archiveCount);
+ return expectedArchiveCount == archiveCount;
+ }
+ }, 100, timeout);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org