You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ra...@apache.org on 2016/12/15 09:05:11 UTC
[43/50] [abbrv] hadoop git commit: HDFS-10884: [SPS]: Add block
movement tracker to track the completion of block movement future tasks at
DN. Contributed by Rakesh R
HDFS-10884: [SPS]: Add block movement tracker to track the completion of block movement future tasks at DN. Contributed by Rakesh R
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/22426183
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/22426183
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/22426183
Branch: refs/heads/HDFS-10285
Commit: 224261837625cc14728ad73e6ca707ca91e7ccf6
Parents: a706199
Author: Uma Maheswara Rao G <um...@intel.com>
Authored: Tue Oct 25 00:40:45 2016 -0700
Committer: Rakesh Radhakrishnan <ra...@apache.org>
Committed: Thu Dec 15 14:24:19 2016 +0530
----------------------------------------------------------------------
.../datanode/BlockStorageMovementTracker.java | 146 ++++++++++++++
.../datanode/StoragePolicySatisfyWorker.java | 178 +++++++++++++----
.../protocol/BlockStorageMovementCommand.java | 12 +-
.../TestStoragePolicySatisfyWorker.java | 190 +++++++++++++------
4 files changed, 427 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
new file mode 100644
index 0000000..d31f075
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockStorageMovementTracker.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsCompletionHandler;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class is used to track the completion of block movement future tasks.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class BlockStorageMovementTracker implements Runnable {
+ private static final Logger LOG = LoggerFactory
+ .getLogger(BlockStorageMovementTracker.class);
+ private final CompletionService<BlockMovementResult> moverCompletionService;
+ private final BlocksMovementsCompletionHandler blksMovementscompletionHandler;
+
+ // Keeps the information - trackID vs its list of blocks
+ private final Map<Long, List<Future<BlockMovementResult>>> moverTaskFutures;
+ private final Map<Long, List<BlockMovementResult>> movementResults;
+
+ /**
+ * BlockStorageMovementTracker constructor.
+ *
+ * @param moverCompletionService
+ * completion service.
+ * @param handler
+ * blocks movements completion handler
+ */
+ public BlockStorageMovementTracker(
+ CompletionService<BlockMovementResult> moverCompletionService,
+ BlocksMovementsCompletionHandler handler) {
+ this.moverCompletionService = moverCompletionService;
+ this.moverTaskFutures = new HashMap<>();
+ this.blksMovementscompletionHandler = handler;
+ this.movementResults = new HashMap<>();
+ }
+
+ @Override
+ public void run() {
+ while (true) {
+ if (moverTaskFutures.size() <= 0) {
+ try {
+ synchronized (moverTaskFutures) {
+ // Waiting for mover tasks.
+ moverTaskFutures.wait(2000);
+ }
+ } catch (InterruptedException ignore) {
+ // ignore
+ }
+ }
+ try {
+ Future<BlockMovementResult> future = moverCompletionService.take();
+ if (future != null) {
+ BlockMovementResult result = future.get();
+ LOG.debug("Completed block movement. {}", result);
+ long trackId = result.getTrackId();
+ List<Future<BlockMovementResult>> blocksMoving = moverTaskFutures
+ .get(trackId);
+ blocksMoving.remove(future);
+
+ List<BlockMovementResult> resultPerTrackIdList =
+ addMovementResultToTrackIdList(result);
+
+ // Completed all the scheduled blocks movement under this 'trackId'.
+ if (blocksMoving.isEmpty()) {
+ synchronized (moverTaskFutures) {
+ moverTaskFutures.remove(trackId);
+ }
+ // handle completed blocks movements per trackId.
+ blksMovementscompletionHandler.handle(resultPerTrackIdList);
+ movementResults.remove(trackId);
+ }
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ // TODO: Do we need failure retries and implement the same if required.
+ LOG.error("Exception while moving block replica to target storage type",
+ e);
+ }
+ }
+ }
+
+ private List<BlockMovementResult> addMovementResultToTrackIdList(
+ BlockMovementResult result) {
+ long trackId = result.getTrackId();
+ List<BlockMovementResult> perTrackIdList = movementResults.get(trackId);
+ if (perTrackIdList == null) {
+ perTrackIdList = new ArrayList<>();
+ movementResults.put(trackId, perTrackIdList);
+ }
+ perTrackIdList.add(result);
+ return perTrackIdList;
+ }
+
+ /**
+ * Add future task to the tracking list to check the completion status of the
+ * block movement.
+ *
+ * @param trackID
+ * tracking Id
+ * @param futureTask
+ * future task used for moving the respective block
+ */
+ void addBlock(long trackID, Future<BlockMovementResult> futureTask) {
+ synchronized (moverTaskFutures) {
+ List<Future<BlockMovementResult>> futures = moverTaskFutures
+ .get(Long.valueOf(trackID));
+ // null for the first task
+ if (futures == null) {
+ futures = new ArrayList<>();
+ moverTaskFutures.put(trackID, futures);
+ }
+ futures.add(futureTask);
+ // Notify waiting tracker thread about the newly added tasks.
+ moverTaskFutures.notify();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
index 2c99963..604fb4a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/StoragePolicySatisfyWorker.java
@@ -33,7 +33,6 @@ import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
@@ -65,6 +64,8 @@ import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* StoragePolicySatisfyWorker handles the storage policy satisfier commands.
* These commands would be issued from NameNode as part of Datanode's heart beat
@@ -82,8 +83,10 @@ public class StoragePolicySatisfyWorker {
private final int moverThreads;
private final ExecutorService moveExecutor;
- private final CompletionService<Void> moverExecutorCompletionService;
- private final List<Future<Void>> moverTaskFutures;
+ private final CompletionService<BlockMovementResult> moverCompletionService;
+ private final BlocksMovementsCompletionHandler handler;
+ private final BlockStorageMovementTracker movementTracker;
+ private Daemon movementTrackerThread;
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
@@ -92,9 +95,13 @@ public class StoragePolicySatisfyWorker {
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
- moverExecutorCompletionService = new ExecutorCompletionService<>(
- moveExecutor);
- moverTaskFutures = new ArrayList<>();
+ moverCompletionService = new ExecutorCompletionService<>(moveExecutor);
+ handler = new BlocksMovementsCompletionHandler();
+ movementTracker = new BlockStorageMovementTracker(moverCompletionService,
+ handler);
+ movementTrackerThread = new Daemon(movementTracker);
+ movementTrackerThread.setName("BlockStorageMovementTracker");
+ movementTrackerThread.start();
// TODO: Needs to manage the number of concurrent moves per DataNode.
}
@@ -133,10 +140,6 @@ public class StoragePolicySatisfyWorker {
* separate thread. Each task will move the block replica to the target node &
* wait for the completion.
*
- * TODO: Presently this function is a blocking call, this has to be refined by
- * moving the tracking logic to another tracker thread. HDFS-10884 jira
- * addresses the same.
- *
* @param trackID
* unique tracking identifier
* @param blockPoolID
@@ -146,68 +149,64 @@ public class StoragePolicySatisfyWorker {
*/
public void processBlockMovingTasks(long trackID, String blockPoolID,
Collection<BlockMovingInfo> blockMovingInfos) {
- Future<Void> moveCallable = null;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
assert blkMovingInfo
.getSources().length == blkMovingInfo.getTargets().length;
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
BlockMovingTask blockMovingTask = new BlockMovingTask(
- blkMovingInfo.getBlock(), blockPoolID,
+ trackID, blockPoolID, blkMovingInfo.getBlock(),
blkMovingInfo.getSources()[i], blkMovingInfo.getTargets()[i],
+ blkMovingInfo.getSourceStorageTypes()[i],
blkMovingInfo.getTargetStorageTypes()[i]);
- moveCallable = moverExecutorCompletionService.submit(blockMovingTask);
- moverTaskFutures.add(moveCallable);
- }
- }
-
- for (int i = 0; i < moverTaskFutures.size(); i++) {
- try {
- moveCallable = moverExecutorCompletionService.take();
- moveCallable.get();
- } catch (InterruptedException | ExecutionException e) {
- // TODO: Failure retries and report back the error to NameNode.
- LOG.error("Exception while moving block replica to target storage type",
- e);
+ Future<BlockMovementResult> moveCallable = moverCompletionService
+ .submit(blockMovingTask);
+ movementTracker.addBlock(trackID, moveCallable);
}
}
}
/**
* This class encapsulates the process of moving the block replica to the
- * given target.
+ * given target and wait for the response.
*/
- private class BlockMovingTask implements Callable<Void> {
+ private class BlockMovingTask implements Callable<BlockMovementResult> {
+ private final long trackID;
+ private final String blockPoolID;
private final Block block;
private final DatanodeInfo source;
private final DatanodeInfo target;
+ private final StorageType srcStorageType;
private final StorageType targetStorageType;
- private String blockPoolID;
- BlockMovingTask(Block block, String blockPoolID, DatanodeInfo source,
- DatanodeInfo target, StorageType targetStorageType) {
- this.block = block;
+ BlockMovingTask(long trackID, String blockPoolID, Block block,
+ DatanodeInfo source, DatanodeInfo target,
+ StorageType srcStorageType, StorageType targetStorageType) {
+ this.trackID = trackID;
this.blockPoolID = blockPoolID;
+ this.block = block;
this.source = source;
this.target = target;
+ this.srcStorageType = srcStorageType;
this.targetStorageType = targetStorageType;
}
@Override
- public Void call() {
- moveBlock();
- return null;
+ public BlockMovementResult call() {
+ BlockMovementStatus status = moveBlock();
+ return new BlockMovementResult(trackID, block.getBlockId(), target,
+ status);
}
- private void moveBlock() {
- LOG.info("Start moving block {}", block);
-
- LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy "
- + "storageType:{}", block, source, target, targetStorageType);
+ private BlockMovementStatus moveBlock() {
+ LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+ + "storageType, sourceStoragetype:{} and destinStoragetype:{}",
+ block, source, target, srcStorageType, targetStorageType);
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
+ ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
DNConf dnConf = datanode.getDnConf();
String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
sock = datanode.newSocket();
@@ -218,7 +217,6 @@ public class StoragePolicySatisfyWorker {
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
- ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
@@ -239,12 +237,14 @@ public class StoragePolicySatisfyWorker {
"Successfully moved block:{} from src:{} to destin:{} for"
+ " satisfying storageType:{}",
block, source, target, targetStorageType);
+ return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
block, source, target, targetStorageType, e);
+ return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
@@ -272,4 +272,102 @@ public class StoragePolicySatisfyWorker {
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
}
}
+
+ /**
+ * Block movement status code.
+ */
+ enum BlockMovementStatus {
+ /** Success. */
+ DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
+ /**
+ * Failure due to generation time stamp mismatches or network errors
+ * or no available space.
+ */
+ DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
+
+ // TODO: need to support different type of failures. Failure due to network
+ // errors, block pinned, no space available etc.
+
+ private final int code;
+
+ private BlockMovementStatus(int code) {
+ this.code = code;
+ }
+
+ /**
+ * @return the status code.
+ */
+ int getStatusCode() {
+ return code;
+ }
+ }
+
+ /**
+ * This class represents result from a block movement task. This will have the
+ * information of the task which was successful or failed due to errors.
+ */
+ static class BlockMovementResult {
+ private final long trackId;
+ private final long blockId;
+ private final DatanodeInfo target;
+ private final BlockMovementStatus status;
+
+ public BlockMovementResult(long trackId, long blockId,
+ DatanodeInfo target, BlockMovementStatus status) {
+ this.trackId = trackId;
+ this.blockId = blockId;
+ this.target = target;
+ this.status = status;
+ }
+
+ long getTrackId() {
+ return trackId;
+ }
+
+ long getBlockId() {
+ return blockId;
+ }
+
+ BlockMovementStatus getStatus() {
+ return status;
+ }
+
+ @Override
+ public String toString() {
+ return new StringBuilder().append("Block movement result(\n ")
+ .append("track id: ").append(trackId).append(" block id: ")
+ .append(blockId).append(" target node: ").append(target)
+ .append(" movement status: ").append(status).append(")").toString();
+ }
+ }
+
+ /**
+ * Blocks movements completion handler, which is used to collect details of
+ * the completed list of block movements and notify the namenode about the
+ * success or failures.
+ */
+ static class BlocksMovementsCompletionHandler {
+ private final List<BlockMovementResult> completedBlocks = new ArrayList<>();
+
+ /**
+ * Collect all the block movement results and notify namenode.
+ *
+ * @param results
+ * result of all the block movements per trackId
+ */
+ void handle(List<BlockMovementResult> results) {
+ completedBlocks.addAll(results);
+ // TODO: notify namenode about the success/failures.
+ }
+
+ @VisibleForTesting
+ List<BlockMovementResult> getCompletedBlocks() {
+ return completedBlocks;
+ }
+ }
+
+ @VisibleForTesting
+ BlocksMovementsCompletionHandler getBlocksMovementsCompletionHandler() {
+ return handler;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
index 7c97f1a..5dcf4e7 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockStorageMovementCommand.java
@@ -35,10 +35,10 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
* service. After the block movement this DataNode sends response back to the
* NameNode about the movement status.
*
- * The coordinator datanode will use 'trackId' identifier to coordinate the block
- * movement of the given set of blocks. TrackId is a unique identifier that
- * represents a group of blocks. Namenode will generate this unique value and
- * send it to the coordinator datanode along with the
+ * The coordinator datanode will use 'trackId' identifier to coordinate the
+ * block movement of the given set of blocks. TrackId is a unique identifier
+ * that represents a group of blocks. Namenode will generate this unique value
+ * and send it to the coordinator datanode along with the
* BlockStorageMovementCommand. Datanode will monitor the completion of the
* block movements that grouped under this trackId and notifies Namenode about
* the completion status.
@@ -153,11 +153,11 @@ public class BlockStorageMovementCommand extends DatanodeCommand {
return new StringBuilder().append("BlockMovingInfo(\n ")
.append("Moving block: ").append(blk).append(" From: ")
.append(Arrays.asList(sourceNodes)).append(" To: [")
- .append(Arrays.asList(targetNodes)).append(")\n")
+ .append(Arrays.asList(targetNodes)).append("\n ")
.append(" sourceStorageTypes: ")
.append(Arrays.toString(sourceStorageTypes))
.append(" targetStorageTypes: ")
- .append(Arrays.toString(targetStorageTypes)).toString();
+ .append(Arrays.toString(targetStorageTypes)).append(")").toString();
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/22426183/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
index d803f1a..ea3eec3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStoragePolicySatisfyWorker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
+import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.List;
@@ -33,10 +34,15 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementResult;
+import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementStatus;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
+import org.junit.After;
import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -51,8 +57,9 @@ public class TestStoragePolicySatisfyWorker {
private static final Logger LOG = LoggerFactory
.getLogger(TestStoragePolicySatisfyWorker.class);
-
private static final int DEFAULT_BLOCK_SIZE = 100;
+ private MiniDFSCluster cluster = null;
+ private final Configuration conf = new HdfsConfiguration();
private static void initConf(Configuration conf) {
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, DEFAULT_BLOCK_SIZE);
@@ -63,64 +70,141 @@ public class TestStoragePolicySatisfyWorker {
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
}
+ @Before
+ public void setUp() throws IOException {
+ initConf(conf);
+ }
+
+ @After
+ public void teardown() throws IOException {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
/**
* Tests to verify that the block replica is moving to ARCHIVE storage type to
* fulfill the storage policy requirement.
*/
@Test(timeout = 120000)
public void testMoveSingleBlockToAnotherDatanode() throws Exception {
- final Configuration conf = new HdfsConfiguration();
- initConf(conf);
- final MiniDFSCluster cluster =
- new MiniDFSCluster.Builder(conf).numDataNodes(4)
- .storageTypes(
- new StorageType[][]{{StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE}})
- .build();
- try {
- cluster.waitActive();
- final DistributedFileSystem dfs = cluster.getFileSystem();
- final String file = "/testMoveSingleBlockToAnotherDatanode";
- // write to DISK
- final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
- out.writeChars("testMoveSingleBlockToAnotherDatanode");
- out.close();
-
- // verify before movement
- LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
- StorageType[] storageTypes = lb.getStorageTypes();
- for (StorageType storageType : storageTypes) {
- Assert.assertTrue(StorageType.DISK == storageType);
- }
- // move to ARCHIVE
- dfs.setStoragePolicy(new Path(file), "COLD");
-
- lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
- DataNode src = cluster.getDataNodes().get(3);
- DatanodeInfo targetDnInfo = DFSTestUtil
- .getLocalDatanodeInfo(src.getXferPort());
-
- // TODO: Need to revisit this when NN is implemented to be able to send
- // block moving commands.
- StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
- src);
- List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
- BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
- lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
- lb.getStorageTypes()[0], StorageType.ARCHIVE);
- blockMovingInfos.add(blockMovingInfo);
- INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
- worker.processBlockMovingTasks(inode.getId(),
- cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
- cluster.triggerHeartbeats();
-
- // Wait till NameNode notified about the block location details
- waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000);
- } finally {
- cluster.shutdown();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(4)
+ .storageTypes(
+ new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+ .build();
+ cluster.waitActive();
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String file = "/testMoveSingleBlockToAnotherDatanode";
+ // write to DISK
+ final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
+ out.writeChars("testMoveSingleBlockToAnotherDatanode");
+ out.close();
+
+ // verify before movement
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ StorageType[] storageTypes = lb.getStorageTypes();
+ for (StorageType storageType : storageTypes) {
+ Assert.assertTrue(StorageType.DISK == storageType);
+ }
+ // move to ARCHIVE
+ dfs.setStoragePolicy(new Path(file), "COLD");
+
+ FSNamesystem namesystem = cluster.getNamesystem();
+ INode inode = namesystem.getFSDirectory().getINode(file);
+ namesystem.getBlockManager().satisfyStoragePolicy(inode.getId());
+
+ cluster.triggerHeartbeats();
+
+ // Wait till NameNode notified about the block location details
+ waitForLocatedBlockWithArchiveStorageType(dfs, file, 2, 30000);
+ }
+
+ /**
+ * Test to verify that satisfy worker can't move blocks. If specified target
+ * datanode doesn't have enough space to accommodate the moving block.
+ */
+ @Test(timeout = 120000)
+ public void testMoveWithNoSpaceAvailable() throws Exception {
+ final long capacity = 150;
+ final String rack0 = "/rack0";
+ final String rack1 = "/rack1";
+ long[] capacities = new long[] {capacity, capacity, capacity / 2};
+ String[] hosts = {"host0", "host1", "host2"};
+ String[] racks = {rack0, rack1, rack0};
+ int numOfDatanodes = capacities.length;
+
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(numOfDatanodes)
+ .hosts(hosts).racks(racks).simulatedCapacities(capacities)
+ .storageTypes(
+ new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.DISK, StorageType.ARCHIVE},
+ {StorageType.ARCHIVE, StorageType.ARCHIVE}})
+ .build();
+
+ cluster.waitActive();
+ InetSocketAddress[] favoredNodes = new InetSocketAddress[3];
+ for (int i = 0; i < favoredNodes.length; i++) {
+ // DFSClient will attempt reverse lookup. In case it resolves
+ // "127.0.0.1" to "localhost", we manually specify the hostname.
+ favoredNodes[i] = cluster.getDataNodes().get(i).getXferAddress();
+ }
+ final DistributedFileSystem dfs = cluster.getFileSystem();
+ final String file = "/testMoveWithNoSpaceAvailable";
+ DFSTestUtil.createFile(dfs, new Path(file), false, 1024, 100,
+ DEFAULT_BLOCK_SIZE, (short) 2, 0, false, favoredNodes);
+
+ // verify before movement
+ LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ StorageType[] storageTypes = lb.getStorageTypes();
+ for (StorageType storageType : storageTypes) {
+ Assert.assertTrue(StorageType.DISK == storageType);
}
+
+ // move to ARCHIVE
+ dfs.setStoragePolicy(new Path(file), "COLD");
+
+ lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
+ DataNode src = cluster.getDataNodes().get(2);
+ DatanodeInfo targetDnInfo = DFSTestUtil
+ .getLocalDatanodeInfo(src.getXferPort());
+
+ StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
+ src);
+ List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
+ BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
+ lb.getBlock().getLocalBlock(), lb.getLocations()[0], targetDnInfo,
+ lb.getStorageTypes()[0], StorageType.ARCHIVE);
+ blockMovingInfos.add(blockMovingInfo);
+ INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
+ worker.processBlockMovingTasks(inode.getId(),
+ cluster.getNamesystem().getBlockPoolId(), blockMovingInfos);
+
+ waitForBlockMovementCompletion(worker, inode.getId(), 1, 30000);
+ }
+
+ private void waitForBlockMovementCompletion(
+ final StoragePolicySatisfyWorker worker, final long inodeId,
+ int expectedFailedItemsCount, int timeout) throws Exception {
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ List<BlockMovementResult> completedBlocks = worker
+ .getBlocksMovementsCompletionHandler().getCompletedBlocks();
+ int failedCount = 0;
+ for (BlockMovementResult blockMovementResult : completedBlocks) {
+ if (BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE ==
+ blockMovementResult.getStatus()) {
+ failedCount++;
+ }
+ }
+ LOG.info("Block movement completed count={}, expected={} and actual={}",
+ completedBlocks.size(), expectedFailedItemsCount, failedCount);
+ return expectedFailedItemsCount == failedCount;
+ }
+ }, 100, timeout);
}
private void waitForLocatedBlockWithArchiveStorageType(
@@ -150,7 +234,7 @@ public class TestStoragePolicySatisfyWorker {
}, 100, timeout);
}
- BlockMovingInfo prepareBlockMovingInfo(Block block,
+ private BlockMovingInfo prepareBlockMovingInfo(Block block,
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
StorageType targetStorageType) {
return new BlockMovingInfo(block, new DatanodeInfo[] {src},
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org