You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by um...@apache.org on 2018/08/10 03:49:18 UTC
[25/50] [abbrv] hadoop git commit: HDFS-12955: [SPS]: Move SPS
classes to a separate package. Contributed by Rakesh R.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
deleted file mode 100644
index 6991ad2..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithStripedFile.java
+++ /dev/null
@@ -1,580 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.server.namenode;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeoutException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.StorageType;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.DistributedFileSystem;
-import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.MiniDFSCluster.DataNodeProperties;
-import org.apache.hadoop.hdfs.NameNodeProxies;
-import org.apache.hadoop.hdfs.StripedFileTestUtil;
-import org.apache.hadoop.hdfs.client.HdfsAdmin;
-import org.apache.hadoop.hdfs.protocol.ClientProtocol;
-import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
-import org.apache.hadoop.test.GenericTestUtils;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Supplier;
-
-/**
- * Tests that StoragePolicySatisfier daemon is able to check the striped blocks
- * to be moved and finding its expected target locations in order to satisfy the
- * storage policy.
- */
-public class TestStoragePolicySatisfierWithStripedFile {
-
- private static final Logger LOG = LoggerFactory
- .getLogger(TestStoragePolicySatisfierWithStripedFile.class);
-
- private final int stripesPerBlock = 2;
-
- private ErasureCodingPolicy ecPolicy;
- private int dataBlocks;
- private int parityBlocks;
- private int cellSize;
- private int defaultStripeBlockSize;
-
- private ErasureCodingPolicy getEcPolicy() {
- return StripedFileTestUtil.getDefaultECPolicy();
- }
-
- /**
- * Initialize erasure coding policy.
- */
- @Before
- public void init(){
- ecPolicy = getEcPolicy();
- dataBlocks = ecPolicy.getNumDataUnits();
- parityBlocks = ecPolicy.getNumParityUnits();
- cellSize = ecPolicy.getCellSize();
- defaultStripeBlockSize = cellSize * stripesPerBlock;
- }
-
- /**
- * Tests to verify that all the striped blocks(data + parity blocks) are
- * moving to satisfy the storage policy.
- */
- @Test(timeout = 300000)
- public void testMoverWithFullStripe() throws Exception {
- // start 10 datanodes
- int numOfDatanodes = 10;
- int storagesPerDatanode = 2;
- long capacity = 20 * defaultStripeBlockSize;
- long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
- for (int i = 0; i < numOfDatanodes; i++) {
- for (int j = 0; j < storagesPerDatanode; j++) {
- capacities[i][j] = capacity;
- }
- }
-
- final Configuration conf = new HdfsConfiguration();
- conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
- true);
- initConfWithStripe(conf, defaultStripeBlockSize);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numOfDatanodes)
- .storagesPerDatanode(storagesPerDatanode)
- .storageTypes(new StorageType[][]{
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE}})
- .storageCapacities(capacities)
- .build();
-
- HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- dfs.enableErasureCodingPolicy(
- StripedFileTestUtil.getDefaultECPolicy().getName());
-
- // set "/bar" directory with HOT storage policy.
- ClientProtocol client = NameNodeProxies.createProxy(conf,
- cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
- String barDir = "/bar";
- client.mkdirs(barDir, new FsPermission((short) 777), true);
- client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
- // set an EC policy on "/bar" directory
- client.setErasureCodingPolicy(barDir,
- StripedFileTestUtil.getDefaultECPolicy().getName());
-
- // write file to barDir
- final String fooFile = "/bar/foo";
- long fileLen = cellSize * dataBlocks;
- DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
- fileLen, (short) 3, 0);
-
- // verify storage types and locations
- LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
- fileLen);
- for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
- for (StorageType type : lb.getStorageTypes()) {
- Assert.assertEquals(StorageType.DISK, type);
- }
- }
- StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
- dataBlocks + parityBlocks);
-
- // start 5 more datanodes
- int numOfNewDatanodes = 5;
- capacities = new long[numOfNewDatanodes][storagesPerDatanode];
- for (int i = 0; i < numOfNewDatanodes; i++) {
- for (int j = 0; j < storagesPerDatanode; j++) {
- capacities[i][j] = capacity;
- }
- }
- cluster.startDataNodes(conf, 5,
- new StorageType[][]{
- {StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE}},
- true, null, null, null, capacities, null, false, false, false, null);
- cluster.triggerHeartbeats();
-
- // move file to ARCHIVE
- client.setStoragePolicy(barDir, "COLD");
- hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
- LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
- cluster.triggerHeartbeats();
-
- waitForBlocksMovementAttemptReport(cluster, 9, 60000);
- // verify storage types and locations
- waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 9,
- 9, 60000);
- } finally {
- cluster.shutdown();
- }
- }
-
- /**
- * Tests to verify that only few datanodes are available and few striped
- * blocks are able to move. Others are still trying to find available nodes.
- *
- * For example, we have 3 nodes A(disk, disk), B(disk, disk), C(disk, archive)
- *
- * Assume a block with storage locations A(disk), B(disk), C(disk). Now, set
- * policy as COLD and invoked {@link HdfsAdmin#satisfyStoragePolicy(Path)},
- * while choosing the target node for A, it shouldn't choose C. For C, it
- * should do local block movement as it has ARCHIVE storage type.
- */
- @Test(timeout = 300000)
- public void testWhenOnlyFewTargetNodesAreAvailableToSatisfyStoragePolicy()
- throws Exception {
- // start 10 datanodes
- int numOfDatanodes = 10;
- int storagesPerDatanode = 2;
- long capacity = 20 * defaultStripeBlockSize;
- long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
- for (int i = 0; i < numOfDatanodes; i++) {
- for (int j = 0; j < storagesPerDatanode; j++) {
- capacities[i][j] = capacity;
- }
- }
-
- final Configuration conf = new HdfsConfiguration();
- conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
- true);
- initConfWithStripe(conf, defaultStripeBlockSize);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numOfDatanodes)
- .storagesPerDatanode(storagesPerDatanode)
- .storageTypes(new StorageType[][]{
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE}})
- .storageCapacities(capacities)
- .build();
-
- HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- dfs.enableErasureCodingPolicy(
- StripedFileTestUtil.getDefaultECPolicy().getName());
- // set "/bar" directory with HOT storage policy.
- ClientProtocol client = NameNodeProxies.createProxy(conf,
- cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
- String barDir = "/bar";
- client.mkdirs(barDir, new FsPermission((short) 777), true);
- client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
- // set an EC policy on "/bar" directory
- client.setErasureCodingPolicy(barDir,
- StripedFileTestUtil.getDefaultECPolicy().getName());
-
- // write file to barDir
- final String fooFile = "/bar/foo";
- long fileLen = cellSize * dataBlocks;
- DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
- fileLen, (short) 3, 0);
-
- // verify storage types and locations
- LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
- fileLen);
- for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
- for (StorageType type : lb.getStorageTypes()) {
- Assert.assertEquals(StorageType.DISK, type);
- }
- }
- StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
- dataBlocks + parityBlocks);
-
- // start 2 more datanodes
- int numOfNewDatanodes = 2;
- capacities = new long[numOfNewDatanodes][storagesPerDatanode];
- for (int i = 0; i < numOfNewDatanodes; i++) {
- for (int j = 0; j < storagesPerDatanode; j++) {
- capacities[i][j] = capacity;
- }
- }
- cluster.startDataNodes(conf, 2,
- new StorageType[][]{
- {StorageType.ARCHIVE, StorageType.ARCHIVE},
- {StorageType.ARCHIVE, StorageType.ARCHIVE}},
- true, null, null, null, capacities, null, false, false, false, null);
- cluster.triggerHeartbeats();
-
- // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
- // storage type.
- client.setStoragePolicy(barDir, "COLD");
- hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
- LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
- cluster.triggerHeartbeats();
-
- waitForBlocksMovementAttemptReport(cluster, 5, 60000);
- waitForAttemptedItems(cluster, 1, 30000);
- // verify storage types and locations.
- waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.ARCHIVE, 5,
- 9, 60000);
- } finally {
- cluster.shutdown();
- }
- }
-
- /**
- * Test SPS for low redundant file blocks.
- * 1. Create cluster with 10 datanode.
- * 1. Create one striped file with default EC Policy.
- * 2. Set policy and call satisfyStoragePolicy for file.
- * 3. Stop NameNode and Datanodes.
- * 4. Start NameNode with 5 datanode and wait for block movement.
- * 5. Start remaining 5 datanode.
- * 6. All replica should be moved in proper storage based on policy.
- */
- @Test(timeout = 300000)
- public void testSPSWhenFileHasLowRedundancyBlocks() throws Exception {
- // start 9 datanodes
- int numOfDatanodes = 9;
- int storagesPerDatanode = 2;
- long capacity = 20 * defaultStripeBlockSize;
- long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
- for (int i = 0; i < numOfDatanodes; i++) {
- for (int j = 0; j < storagesPerDatanode; j++) {
- capacities[i][j] = capacity;
- }
- }
-
- final Configuration conf = new HdfsConfiguration();
- conf.set(DFSConfigKeys
- .DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
- "3000");
- conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
- true);
- initConfWithStripe(conf, defaultStripeBlockSize);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numOfDatanodes)
- .storagesPerDatanode(storagesPerDatanode)
- .storageTypes(new StorageType[][]{
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE},
- {StorageType.DISK, StorageType.ARCHIVE}})
- .storageCapacities(capacities)
- .build();
- try {
- cluster.waitActive();
- DistributedFileSystem fs = cluster.getFileSystem();
- fs.enableErasureCodingPolicy(
- StripedFileTestUtil.getDefaultECPolicy().getName());
- Path barDir = new Path("/bar");
- fs.mkdirs(barDir);
- // set an EC policy on "/bar" directory
- fs.setErasureCodingPolicy(barDir,
- StripedFileTestUtil.getDefaultECPolicy().getName());
-
- // write file to barDir
- final Path fooFile = new Path("/bar/foo");
- long fileLen = cellSize * dataBlocks;
- DFSTestUtil.createFile(cluster.getFileSystem(), fooFile,
- fileLen, (short) 3, 0);
-
- // Move file to ARCHIVE.
- fs.setStoragePolicy(barDir, "COLD");
- //Stop DataNodes and restart namenode
- List<DataNodeProperties> list = new ArrayList<>(numOfDatanodes);
- for (int i = 0; i < numOfDatanodes; i++) {
- list.add(cluster.stopDataNode(0));
- }
- cluster.restartNameNodes();
- // Restart half datanodes
- for (int i = 0; i < 5; i++) {
- cluster.restartDataNode(list.get(i), false);
- }
- cluster.waitActive();
- fs.satisfyStoragePolicy(fooFile);
- DFSTestUtil.waitExpectedStorageType(fooFile.toString(),
- StorageType.ARCHIVE, 5, 30000, cluster.getFileSystem());
- //Start reaming datanodes
- for (int i = numOfDatanodes - 1; i >= 5; i--) {
- cluster.restartDataNode(list.get(i), false);
- }
- // verify storage types and locations.
- waitExpectedStorageType(cluster, fooFile.toString(), fileLen,
- StorageType.ARCHIVE, 9, 9, 60000);
- } finally {
- cluster.shutdown();
- }
- }
-
-
- /**
- * Tests to verify that for the given path, no blocks under the given path
- * will be scheduled for block movement as there are no available datanode
- * with required storage type.
- *
- * For example, there are two block for a file:
- *
- * File1 => blk_1[locations=A(DISK),B(DISK),C(DISK)],
- * blk_2[locations=A(DISK),B(DISK),C(DISK)]. Now, set storage policy to COLD.
- * No datanode is available with storage type ARCHIVE.
- *
- * SPS won't schedule any block movement for this path.
- */
- @Test(timeout = 300000)
- public void testWhenNoTargetDatanodeToSatisfyStoragePolicy()
- throws Exception {
- // start 10 datanodes
- int numOfDatanodes = 10;
- int storagesPerDatanode = 2;
- long capacity = 20 * defaultStripeBlockSize;
- long[][] capacities = new long[numOfDatanodes][storagesPerDatanode];
- for (int i = 0; i < numOfDatanodes; i++) {
- for (int j = 0; j < storagesPerDatanode; j++) {
- capacities[i][j] = capacity;
- }
- }
-
- final Configuration conf = new HdfsConfiguration();
- conf.setBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
- true);
- initConfWithStripe(conf, defaultStripeBlockSize);
- final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
- .numDataNodes(numOfDatanodes)
- .storagesPerDatanode(storagesPerDatanode)
- .storageTypes(new StorageType[][]{
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK},
- {StorageType.DISK, StorageType.DISK}})
- .storageCapacities(capacities)
- .build();
-
- HdfsAdmin hdfsAdmin = new HdfsAdmin(FileSystem.getDefaultUri(conf), conf);
- try {
- cluster.waitActive();
- DistributedFileSystem dfs = cluster.getFileSystem();
- dfs.enableErasureCodingPolicy(
- StripedFileTestUtil.getDefaultECPolicy().getName());
- // set "/bar" directory with HOT storage policy.
- ClientProtocol client = NameNodeProxies.createProxy(conf,
- cluster.getFileSystem(0).getUri(), ClientProtocol.class).getProxy();
- String barDir = "/bar";
- client.mkdirs(barDir, new FsPermission((short) 777), true);
- client.setStoragePolicy(barDir, HdfsConstants.HOT_STORAGE_POLICY_NAME);
- // set an EC policy on "/bar" directory
- client.setErasureCodingPolicy(barDir,
- StripedFileTestUtil.getDefaultECPolicy().getName());
-
- // write file to barDir
- final String fooFile = "/bar/foo";
- long fileLen = cellSize * dataBlocks;
- DFSTestUtil.createFile(cluster.getFileSystem(), new Path(fooFile),
- fileLen, (short) 3, 0);
-
- // verify storage types and locations
- LocatedBlocks locatedBlocks = client.getBlockLocations(fooFile, 0,
- fileLen);
- for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
- for (StorageType type : lb.getStorageTypes()) {
- Assert.assertEquals(StorageType.DISK, type);
- }
- }
- StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks,
- dataBlocks + parityBlocks);
-
- // Move file to ARCHIVE. Only 5 datanodes are available with ARCHIVE
- // storage type.
- client.setStoragePolicy(barDir, "COLD");
- hdfsAdmin.satisfyStoragePolicy(new Path(fooFile));
- LOG.info("Sets storage policy to COLD and invoked satisfyStoragePolicy");
- cluster.triggerHeartbeats();
-
- waitForAttemptedItems(cluster, 1, 30000);
- // verify storage types and locations.
- waitExpectedStorageType(cluster, fooFile, fileLen, StorageType.DISK, 9, 9,
- 60000);
- waitForAttemptedItems(cluster, 1, 30000);
- } finally {
- cluster.shutdown();
- }
- }
-
- private void waitForAttemptedItems(MiniDFSCluster cluster,
- long expectedBlkMovAttemptedCount, int timeout)
- throws TimeoutException, InterruptedException {
- BlockManager blockManager = cluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- LOG.info("expectedAttemptedItemsCount={} actualAttemptedItemsCount={}",
- expectedBlkMovAttemptedCount,
- sps.getAttemptedItemsMonitor().getAttemptedItemsCount());
- return sps.getAttemptedItemsMonitor()
- .getAttemptedItemsCount() == expectedBlkMovAttemptedCount;
- }
- }, 100, timeout);
- }
-
- private static void initConfWithStripe(Configuration conf,
- int stripeBlockSize) {
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, stripeBlockSize);
- conf.setLong(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1L);
- conf.setLong(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_INTERVAL_SECONDS_KEY,
- 1L);
- conf.setBoolean(DFSConfigKeys.DFS_NAMENODE_REDUNDANCY_CONSIDERLOAD_KEY,
- false);
- }
-
- // Check whether the Block movement has been successfully completed to satisfy
- // the storage policy for the given file.
- private void waitExpectedStorageType(MiniDFSCluster cluster,
- final String fileName, long fileLen,
- final StorageType expectedStorageType, int expectedStorageCount,
- int expectedBlkLocationCount, int timeout) throws Exception {
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- int actualStorageCount = 0;
- try {
- LocatedBlocks locatedBlocks = cluster.getFileSystem().getClient()
- .getLocatedBlocks(fileName, 0, fileLen);
- for (LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
- LOG.info("LocatedBlocks => Size {}, locs {}",
- lb.getLocations().length, lb);
- if (lb.getLocations().length > expectedBlkLocationCount) {
- return false;
- }
- for (StorageType storageType : lb.getStorageTypes()) {
- if (expectedStorageType == storageType) {
- actualStorageCount++;
- } else {
- LOG.info("Expected storage type {} and actual {}",
- expectedStorageType, storageType);
- }
- }
- }
- LOG.info(
- expectedStorageType + " replica count, expected={} and actual={}",
- expectedStorageCount, actualStorageCount);
- } catch (IOException e) {
- LOG.error("Exception while getting located blocks", e);
- return false;
- }
- return expectedStorageCount == actualStorageCount;
- }
- }, 100, timeout);
- }
-
- // Check whether the block movement attempt report has been arrived at the
- // Namenode(SPS).
- private void waitForBlocksMovementAttemptReport(MiniDFSCluster cluster,
- long expectedMovementFinishedBlocksCount, int timeout)
- throws TimeoutException, InterruptedException {
- BlockManager blockManager = cluster.getNamesystem().getBlockManager();
- final StoragePolicySatisfier sps = blockManager.getStoragePolicySatisfier();
- Assert.assertNotNull("Failed to get SPS object reference!", sps);
-
- GenericTestUtils.waitFor(new Supplier<Boolean>() {
- @Override
- public Boolean get() {
- LOG.info("MovementFinishedBlocks: expectedCount={} actualCount={}",
- expectedMovementFinishedBlocksCount,
- sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount());
- return sps.getAttemptedItemsMonitor().getMovementFinishedBlocksCount()
- >= expectedMovementFinishedBlocksCount;
- }
- }, 100, timeout);
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0b71ea21/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
new file mode 100644
index 0000000..62766d9
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/sps/TestBlockStorageMovementAttemptedItems.java
@@ -0,0 +1,196 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode.sps;
+
+import static org.apache.hadoop.util.Time.monotonicNow;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.namenode.Namesystem;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.AttemptedItemInfo;
+import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier.ItemInfo;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+/**
+ * Tests that block storage movement attempt failures are reported from DN and
+ * processed them correctly or not.
+ */
+public class TestBlockStorageMovementAttemptedItems {
+
+ private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
+ private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
+ private final int selfRetryTimeout = 500;
+
+ @Before
+ public void setup() throws Exception {
+ unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(
+ Mockito.mock(Namesystem.class),
+ Mockito.mock(StoragePolicySatisfier.class), 100);
+ bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100,
+ selfRetryTimeout, unsatisfiedStorageMovementFiles);
+ }
+
+ @After
+ public void teardown() {
+ if (bsmAttemptedItems != null) {
+ bsmAttemptedItems.stop();
+ bsmAttemptedItems.stopGracefully();
+ }
+ }
+
+ private boolean checkItemMovedForRetry(Long item, long retryTimeout)
+ throws InterruptedException {
+ long stopTime = monotonicNow() + (retryTimeout * 2);
+ boolean isItemFound = false;
+ while (monotonicNow() < (stopTime)) {
+ ItemInfo ele = null;
+ while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
+ if (item == ele.getTrackId()) {
+ isItemFound = true;
+ break;
+ }
+ }
+ if (!isItemFound) {
+ Thread.sleep(100);
+ } else {
+ break;
+ }
+ }
+ return isItemFound;
+ }
+
+ /**
+ * Verify that moved blocks reporting should queued up the block info.
+ */
+ @Test(timeout = 30000)
+ public void testAddReportedMoveAttemptFinishedBlocks() throws Exception {
+ bsmAttemptedItems.start(); // start block movement result monitor thread
+ Long item = new Long(1234);
+ List<Block> blocks = new ArrayList<Block>();
+ blocks.add(new Block(item));
+ bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
+ Block[] blockArray = new Block[blocks.size()];
+ blocks.toArray(blockArray);
+ bsmAttemptedItems.addReportedMovedBlocks(blockArray);
+ assertEquals("Failed to receive result!", 1,
+ bsmAttemptedItems.getMovementFinishedBlocksCount());
+ }
+
+ /**
+ * Verify empty moved blocks reporting queue.
+ */
+ @Test(timeout = 30000)
+ public void testNoBlockMovementAttemptFinishedReportAdded() throws Exception {
+ bsmAttemptedItems.start(); // start block movement report monitor thread
+ Long item = new Long(1234);
+ List<Block> blocks = new ArrayList<>();
+ blocks.add(new Block(item));
+ bsmAttemptedItems.add(new AttemptedItemInfo(0L, 0L, 0L, blocks, 0));
+ assertEquals("Shouldn't receive result", 0,
+ bsmAttemptedItems.getMovementFinishedBlocksCount());
+ assertEquals("Item doesn't exist in the attempted list", 1,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
+
+ /**
+ * Partial block movement with
+ * BlockMovementStatus#DN_BLK_STORAGE_MOVEMENT_SUCCESS. Here, first occurrence
+ * is #blockStorageMovementReportedItemsCheck() and then
+ * #blocksStorageMovementUnReportedItemsCheck().
+ */
+ @Test(timeout = 30000)
+ public void testPartialBlockMovementShouldBeRetried1() throws Exception {
+ Long item = new Long(1234);
+ List<Block> blocks = new ArrayList<>();
+ blocks.add(new Block(item));
+ blocks.add(new Block(5678L));
+ Long trackID = 0L;
+ bsmAttemptedItems
+ .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+ Block[] blksMovementReport = new Block[1];
+ blksMovementReport[0] = new Block(item);
+ bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+
+ // start block movement report monitor thread
+ bsmAttemptedItems.start();
+ assertTrue("Failed to add to the retry list",
+ checkItemMovedForRetry(trackID, 5000));
+ assertEquals("Failed to remove from the attempted list", 0,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
+
+ /**
+ * Partial block movement. Here, first occurrence is
+ * #blocksStorageMovementUnReportedItemsCheck() and then
+ * #blockStorageMovementReportedItemsCheck().
+ */
+ @Test(timeout = 30000)
+ public void testPartialBlockMovementShouldBeRetried2() throws Exception {
+ Long item = new Long(1234);
+ Long trackID = 0L;
+ List<Block> blocks = new ArrayList<>();
+ blocks.add(new Block(item));
+ bsmAttemptedItems
+ .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+ Block[] blksMovementReport = new Block[1];
+ blksMovementReport[0] = new Block(item);
+ bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+
+ Thread.sleep(selfRetryTimeout * 2); // Waiting to get timed out
+
+ bsmAttemptedItems.blocksStorageMovementUnReportedItemsCheck();
+ bsmAttemptedItems.blockStorageMovementReportedItemsCheck();
+
+ assertTrue("Failed to add to the retry list",
+ checkItemMovedForRetry(trackID, 5000));
+ assertEquals("Failed to remove from the attempted list", 0,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
+
+ /**
+ * Partial block movement with only BlocksStorageMoveAttemptFinished report
+ * and storageMovementAttemptedItems list is empty.
+ */
+ @Test(timeout = 30000)
+ public void testPartialBlockMovementWithEmptyAttemptedQueue()
+ throws Exception {
+ Long item = new Long(1234);
+ Long trackID = 0L;
+ List<Block> blocks = new ArrayList<>();
+ blocks.add(new Block(item));
+ bsmAttemptedItems
+ .add(new AttemptedItemInfo(trackID, trackID, 0L, blocks, 0));
+ Block[] blksMovementReport = new Block[1];
+ blksMovementReport[0] = new Block(item);
+ bsmAttemptedItems.addReportedMovedBlocks(blksMovementReport);
+ assertFalse(
+ "Should not add in queue again if it is not there in"
+ + " storageMovementAttemptedItems",
+ checkItemMovedForRetry(trackID, 5000));
+ assertEquals("Failed to remove from the attempted list", 1,
+ bsmAttemptedItems.getAttemptedItemsCount());
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org