You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sz...@apache.org on 2011/07/19 17:33:37 UTC
svn commit: r1148400 -
/hadoop/common/trunk/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
Author: szetszwo
Date: Tue Jul 19 15:33:36 2011
New Revision: 1148400
URL: http://svn.apache.org/viewvc?rev=1148400&view=rev
Log:
Temporarily disable TestBlockPlacementPolicyRaid for MAPREDUCE-2711.
Modified:
hadoop/common/trunk/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
Modified: hadoop/common/trunk/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java?rev=1148400&r1=1148399&r2=1148400&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/raid/src/test/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockPlacementPolicyRaid.java Tue Jul 19 15:33:36 2011
@@ -1,514 +1,514 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.server.blockmanagement;
-
-import java.io.IOException;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import junit.framework.Assert;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSTestUtil;
-import org.apache.hadoop.hdfs.MiniDFSCluster;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks;
-import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType;
-import org.apache.hadoop.hdfs.server.namenode.*;
-import org.apache.hadoop.raid.RaidNode;
-import org.junit.Test;
-
-public class TestBlockPlacementPolicyRaid {
- private Configuration conf = null;
- private MiniDFSCluster cluster = null;
- private FSNamesystem namesystem = null;
- private BlockPlacementPolicyRaid policy = null;
- private FileSystem fs = null;
- String[] rack1 = {"/rack1"};
- String[] rack2 = {"/rack2"};
- String[] host1 = {"host1.rack1.com"};
- String[] host2 = {"host2.rack2.com"};
- String xorPrefix = null;
- String raidTempPrefix = null;
- String raidrsTempPrefix = null;
- String raidrsHarTempPrefix = null;
-
- final static Log LOG =
- LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
-
- protected void setupCluster() throws IOException {
- conf = new Configuration();
- conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
- conf.set("dfs.replication.pending.timeout.sec", "2");
- conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
- conf.set("dfs.block.replicator.classname",
- "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid");
- conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
- conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
- conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
- // start the cluster with one datanode first
- cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
- format(true).racks(rack1).hosts(host1).build();
- cluster.waitActive();
- namesystem = cluster.getNameNode().getNamesystem();
- Assert.assertTrue("BlockPlacementPolicy type is not correct.",
- namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid);
- policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator;
- fs = cluster.getFileSystem();
- xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
- raidTempPrefix = RaidNode.xorTempPrefix(conf);
- raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
- raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
- }
-
- /**
- * Test that the parity files will be placed at the good locations when we
- * create them.
- */
- @Test
- public void testChooseTargetForRaidFile() throws IOException {
- setupCluster();
- try {
- String src = "/dir/file";
- String parity = raidrsTempPrefix + src;
- DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
- DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
- refreshPolicy();
- setBlockPlacementPolicy(namesystem, policy);
- // start 3 more datanodes
- String[] racks = {"/rack2", "/rack2", "/rack2",
- "/rack2", "/rack2", "/rack2"};
- String[] hosts =
- {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
- "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
- cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
- int numBlocks = 6;
- DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
- DFSTestUtil.waitReplication(fs, new Path(parity), (short)2);
- FileStatus srcStat = fs.getFileStatus(new Path(src));
- BlockLocation[] srcLoc =
- fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
- FileStatus parityStat = fs.getFileStatus(new Path(parity));
- BlockLocation[] parityLoc =
- fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
- int parityLen = RaidNode.rsParityLength(conf);
- for (int i = 0; i < numBlocks / parityLen; i++) {
- Set<String> locations = new HashSet<String>();
- for (int j = 0; j < srcLoc.length; j++) {
- String [] names = srcLoc[j].getNames();
- for (int k = 0; k < names.length; k++) {
- LOG.info("Source block location: " + names[k]);
- locations.add(names[k]);
- }
- }
- for (int j = 0 ; j < parityLen; j++) {
- String[] names = parityLoc[j + i * parityLen].getNames();
- for (int k = 0; k < names.length; k++) {
- LOG.info("Parity block location: " + names[k]);
- Assert.assertTrue(locations.add(names[k]));
- }
- }
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- /**
- * Test that the har parity files will be placed at the good locations when we
- * create them.
- */
- @Test
- public void testChooseTargetForHarRaidFile() throws IOException {
- setupCluster();
- try {
- String[] racks = {"/rack2", "/rack2", "/rack2",
- "/rack2", "/rack2", "/rack2"};
- String[] hosts =
- {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
- "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
- cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
- String harParity = raidrsHarTempPrefix + "/dir/file";
- int numBlocks = 11;
- DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
- DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
- FileStatus stat = fs.getFileStatus(new Path(harParity));
- BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
- int rsParityLength = RaidNode.rsParityLength(conf);
- for (int i = 0; i < numBlocks - rsParityLength; i++) {
- Set<String> locations = new HashSet<String>();
- for (int j = 0; j < rsParityLength; j++) {
- for (int k = 0; k < loc[i + j].getNames().length; k++) {
- // verify that every adjacent 4 blocks are on differnt nodes
- String name = loc[i + j].getNames()[k];
- LOG.info("Har Raid block location: " + name);
- Assert.assertTrue(locations.add(name));
- }
- }
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- /**
- * Test BlockPlacementPolicyRaid.CachedLocatedBlocks
- * Verify that the results obtained from cache is the same as
- * the results obtained directly
- */
- @Test
- public void testCachedBlocks() throws IOException {
- setupCluster();
- try {
- String file1 = "/dir/file1";
- String file2 = "/dir/file2";
- DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
- DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
- // test blocks cache
- CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
- verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
- verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
- verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
- verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
- try {
- Thread.sleep(1200L);
- } catch (InterruptedException e) {
- }
- verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
- verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- /**
- * Test BlockPlacementPolicyRaid.CachedFullPathNames
- * Verify that the results obtained from cache is the same as
- * the results obtained directly
- */
- @Test
- public void testCachedPathNames() throws IOException {
- setupCluster();
- try {
- String file1 = "/dir/file1";
- String file2 = "/dir/file2";
- DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
- DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
- // test full path cache
- CachedFullPathNames cachedFullPathNames =
- new CachedFullPathNames(namesystem);
- FSInodeInfo inode1 = null;
- FSInodeInfo inode2 = null;
- NameNodeRaidTestUtil.readLock(namesystem.dir);
- try {
- inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true);
- inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true);
- } finally {
- NameNodeRaidTestUtil.readUnLock(namesystem.dir);
- }
- verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
- verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
- verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
- verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
- try {
- Thread.sleep(1200L);
- } catch (InterruptedException e) {
- }
- verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
- verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
- /**
- * Test the result of getCompanionBlocks() on the unraided files
- */
- @Test
- public void testGetCompanionBLocks() throws IOException {
- setupCluster();
- try {
- String file1 = "/dir/file1";
- String file2 = "/raid/dir/file2";
- String file3 = "/raidrs/dir/file3";
- // Set the policy to default policy to place the block in the default way
- setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
- conf, namesystem, namesystem.clusterMap));
- DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
- DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
- DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
- Collection<LocatedBlock> companionBlocks;
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
- Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
- Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
- Assert.assertEquals(1, companionBlocks.size());
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
- Assert.assertEquals(1, companionBlocks.size());
-
- int rsParityLength = RaidNode.rsParityLength(conf);
- companionBlocks = getCompanionBlocks(
- namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
- Assert.assertEquals(rsParityLength, companionBlocks.size());
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
- Assert.assertEquals(rsParityLength, companionBlocks.size());
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
- Assert.assertEquals(2, companionBlocks.size());
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- static void setBlockPlacementPolicy(
- FSNamesystem namesystem, BlockPlacementPolicy policy) {
- namesystem.writeLock();
- try {
- namesystem.blockManager.replicator = policy;
- } finally {
- namesystem.writeUnlock();
- }
- }
-
- /**
- * Test BlockPlacementPolicyRaid actually deletes the correct replica.
- * Start 2 datanodes and create 1 source file and its parity file.
- * 1) Start host1, create the parity file with replication 1
- * 2) Start host2, create the source file with replication 2
- * 3) Set repliation of source file to 1
- * Verify that the policy should delete the block with more companion blocks.
- */
- @Test
- public void testDeleteReplica() throws IOException {
- setupCluster();
- try {
- // Set the policy to default policy to place the block in the default way
- setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
- conf, namesystem, namesystem.clusterMap));
- DatanodeDescriptor datanode1 =
- NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next();
- String source = "/dir/file";
- String parity = xorPrefix + source;
-
- final Path parityPath = new Path(parity);
- DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
- DFSTestUtil.waitReplication(fs, parityPath, (short)1);
-
- // start one more datanode
- cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
- DatanodeDescriptor datanode2 = null;
- for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) {
- if (!d.getName().equals(datanode1.getName())) {
- datanode2 = d;
- }
- }
- Assert.assertTrue(datanode2 != null);
- cluster.waitActive();
- final Path sourcePath = new Path(source);
- DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
- DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
-
- refreshPolicy();
- Assert.assertEquals(parity,
- policy.getParityFile(source));
- Assert.assertEquals(source,
- policy.getSourceFile(parity, xorPrefix));
-
- List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
- List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
- Assert.assertEquals(5, sourceBlocks.size());
- Assert.assertEquals(3, parityBlocks.size());
-
- // verify the result of getCompanionBlocks()
- Collection<LocatedBlock> companionBlocks;
- companionBlocks = getCompanionBlocks(
- namesystem, policy, sourceBlocks.get(0).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{0, 1}, new int[]{0});
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, sourceBlocks.get(1).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{0, 1}, new int[]{0});
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, sourceBlocks.get(2).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{2, 3}, new int[]{1});
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, sourceBlocks.get(3).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{2, 3}, new int[]{1});
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, sourceBlocks.get(4).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{4}, new int[]{2});
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, parityBlocks.get(0).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{0, 1}, new int[]{0});
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, parityBlocks.get(1).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{2, 3}, new int[]{1});
-
- companionBlocks = getCompanionBlocks(
- namesystem, policy, parityBlocks.get(2).getBlock());
- verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
- new int[]{4}, new int[]{2});
-
- // Set the policy back to raid policy. We have to create a new object
- // here to clear the block location cache
- refreshPolicy();
- setBlockPlacementPolicy(namesystem, policy);
- // verify policy deletes the correct blocks. companion blocks should be
- // evenly distributed.
- fs.setReplication(sourcePath, (short)1);
- DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
- Map<String, Integer> counters = new HashMap<String, Integer>();
- refreshPolicy();
- for (int i = 0; i < parityBlocks.size(); i++) {
- companionBlocks = getCompanionBlocks(
- namesystem, policy, parityBlocks.get(i).getBlock());
-
- counters = BlockPlacementPolicyRaid.countCompanionBlocks(
- companionBlocks, false);
- Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
- counters.get(datanode1.getName()) <= 2);
- Assert.assertTrue(counters.get(datanode1.getName()) +
- counters.get(datanode2.getName()) ==
- companionBlocks.size());
-
- counters = BlockPlacementPolicyRaid.countCompanionBlocks(
- companionBlocks, true);
- Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
- counters.get(datanode1.getParent().getName()) <= 2);
- Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
- counters.get(datanode2.getParent().getName()) ==
- companionBlocks.size());
- }
- } finally {
- if (cluster != null) {
- cluster.shutdown();
- }
- }
- }
-
- // create a new BlockPlacementPolicyRaid to clear the cache
- private void refreshPolicy() {
- policy = new BlockPlacementPolicyRaid();
- policy.initialize(conf, namesystem, namesystem.clusterMap);
- }
-
- private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
- List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
- int[] sourceBlockIndexes, int[] parityBlockIndexes) {
- Set<ExtendedBlock> blockSet = new HashSet<ExtendedBlock>();
- for (LocatedBlock b : companionBlocks) {
- blockSet.add(b.getBlock());
- }
- Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
- blockSet.size());
- for (int index : sourceBlockIndexes) {
- Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
- }
- for (int index : parityBlockIndexes) {
- Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
- }
- }
-
- private void verifyCachedFullPathNameResult(
- CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
- throws IOException {
- String res1 = inode.getFullPathName();
- String res2 = cachedFullPathNames.get(inode);
- LOG.info("Actual path name: " + res1);
- LOG.info("Cached path name: " + res2);
- Assert.assertEquals(cachedFullPathNames.get(inode),
- inode.getFullPathName());
- }
-
- private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
- FSNamesystem namesystem, String file) throws IOException{
- long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
- List<LocatedBlock> res1 = NameNodeRaidUtil.getBlockLocations(namesystem,
- file, 0L, len, false, false).getLocatedBlocks();
- List<LocatedBlock> res2 = cachedBlocks.get(file);
- for (int i = 0; i < res1.size(); i++) {
- LOG.info("Actual block: " + res1.get(i).getBlock());
- LOG.info("Cached block: " + res2.get(i).getBlock());
- Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
- }
- }
-
- private Collection<LocatedBlock> getCompanionBlocks(
- FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
- ExtendedBlock block) throws IOException {
- INodeFile inode = namesystem.blockManager.blocksMap.getINode(block
- .getLocalBlock());
- FileType type = policy.getFileType(inode.getFullPathName());
- return policy.getCompanionBlocks(inode.getFullPathName(), type,
- block.getLocalBlock());
- }
-
- private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
- throws IOException {
- long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
- return NameNodeRaidUtil.getBlockLocations(namesystem,
- file, 0, len, false, false).getLocatedBlocks();
- }
-}
+///**
+// * Licensed to the Apache Software Foundation (ASF) under one
+// * or more contributor license agreements. See the NOTICE file
+// * distributed with this work for additional information
+// * regarding copyright ownership. The ASF licenses this file
+// * to you under the Apache License, Version 2.0 (the
+// * "License"); you may not use this file except in compliance
+// * with the License. You may obtain a copy of the License at
+// *
+// * http://www.apache.org/licenses/LICENSE-2.0
+// *
+// * Unless required by applicable law or agreed to in writing, software
+// * distributed under the License is distributed on an "AS IS" BASIS,
+// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// * See the License for the specific language governing permissions and
+// * limitations under the License.
+// */
+//
+//package org.apache.hadoop.hdfs.server.blockmanagement;
+//
+//import java.io.IOException;
+//import java.util.Collection;
+//import java.util.HashMap;
+//import java.util.HashSet;
+//import java.util.List;
+//import java.util.Map;
+//import java.util.Set;
+//
+//import junit.framework.Assert;
+//
+//import org.apache.commons.logging.Log;
+//import org.apache.commons.logging.LogFactory;
+//import org.apache.hadoop.conf.Configuration;
+//import org.apache.hadoop.fs.BlockLocation;
+//import org.apache.hadoop.fs.FileStatus;
+//import org.apache.hadoop.fs.FileSystem;
+//import org.apache.hadoop.fs.Path;
+//import org.apache.hadoop.hdfs.DFSConfigKeys;
+//import org.apache.hadoop.hdfs.DFSTestUtil;
+//import org.apache.hadoop.hdfs.MiniDFSCluster;
+//import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+//import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+//import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedFullPathNames;
+//import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.CachedLocatedBlocks;
+//import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyRaid.FileType;
+//import org.apache.hadoop.hdfs.server.namenode.*;
+//import org.apache.hadoop.raid.RaidNode;
+//import org.junit.Test;
+//
+//public class TestBlockPlacementPolicyRaid {
+// private Configuration conf = null;
+// private MiniDFSCluster cluster = null;
+// private FSNamesystem namesystem = null;
+// private BlockPlacementPolicyRaid policy = null;
+// private FileSystem fs = null;
+// String[] rack1 = {"/rack1"};
+// String[] rack2 = {"/rack2"};
+// String[] host1 = {"host1.rack1.com"};
+// String[] host2 = {"host2.rack2.com"};
+// String xorPrefix = null;
+// String raidTempPrefix = null;
+// String raidrsTempPrefix = null;
+// String raidrsHarTempPrefix = null;
+//
+// final static Log LOG =
+// LogFactory.getLog(TestBlockPlacementPolicyRaid.class);
+//
+// protected void setupCluster() throws IOException {
+// conf = new Configuration();
+// conf.setLong(DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, 1000L);
+// conf.set("dfs.replication.pending.timeout.sec", "2");
+// conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1L);
+// conf.set("dfs.block.replicator.classname",
+// "org.apache.hadoop.hdfs.server.namenode.BlockPlacementPolicyRaid");
+// conf.set(RaidNode.STRIPE_LENGTH_KEY, "2");
+// conf.set(RaidNode.RS_PARITY_LENGTH_KEY, "3");
+// conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, 1);
+// // start the cluster with one datanode first
+// cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).
+// format(true).racks(rack1).hosts(host1).build();
+// cluster.waitActive();
+// namesystem = cluster.getNameNode().getNamesystem();
+// Assert.assertTrue("BlockPlacementPolicy type is not correct.",
+// namesystem.blockManager.replicator instanceof BlockPlacementPolicyRaid);
+// policy = (BlockPlacementPolicyRaid) namesystem.blockManager.replicator;
+// fs = cluster.getFileSystem();
+// xorPrefix = RaidNode.xorDestinationPath(conf).toUri().getPath();
+// raidTempPrefix = RaidNode.xorTempPrefix(conf);
+// raidrsTempPrefix = RaidNode.rsTempPrefix(conf);
+// raidrsHarTempPrefix = RaidNode.rsHarTempPrefix(conf);
+// }
+//
+// /**
+// * Test that the parity files will be placed at the good locations when we
+// * create them.
+// */
+// @Test
+// public void testChooseTargetForRaidFile() throws IOException {
+// setupCluster();
+// try {
+// String src = "/dir/file";
+// String parity = raidrsTempPrefix + src;
+// DFSTestUtil.createFile(fs, new Path(src), 4, (short)1, 0L);
+// DFSTestUtil.waitReplication(fs, new Path(src), (short)1);
+// refreshPolicy();
+// setBlockPlacementPolicy(namesystem, policy);
+// // start 3 more datanodes
+// String[] racks = {"/rack2", "/rack2", "/rack2",
+// "/rack2", "/rack2", "/rack2"};
+// String[] hosts =
+// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+// int numBlocks = 6;
+// DFSTestUtil.createFile(fs, new Path(parity), numBlocks, (short)2, 0L);
+// DFSTestUtil.waitReplication(fs, new Path(parity), (short)2);
+// FileStatus srcStat = fs.getFileStatus(new Path(src));
+// BlockLocation[] srcLoc =
+// fs.getFileBlockLocations(srcStat, 0, srcStat.getLen());
+// FileStatus parityStat = fs.getFileStatus(new Path(parity));
+// BlockLocation[] parityLoc =
+// fs.getFileBlockLocations(parityStat, 0, parityStat.getLen());
+// int parityLen = RaidNode.rsParityLength(conf);
+// for (int i = 0; i < numBlocks / parityLen; i++) {
+// Set<String> locations = new HashSet<String>();
+// for (int j = 0; j < srcLoc.length; j++) {
+// String [] names = srcLoc[j].getNames();
+// for (int k = 0; k < names.length; k++) {
+// LOG.info("Source block location: " + names[k]);
+// locations.add(names[k]);
+// }
+// }
+// for (int j = 0 ; j < parityLen; j++) {
+// String[] names = parityLoc[j + i * parityLen].getNames();
+// for (int k = 0; k < names.length; k++) {
+// LOG.info("Parity block location: " + names[k]);
+// Assert.assertTrue(locations.add(names[k]));
+// }
+// }
+// }
+// } finally {
+// if (cluster != null) {
+// cluster.shutdown();
+// }
+// }
+// }
+//
+// /**
+// * Test that the har parity files will be placed at the good locations when we
+// * create them.
+// */
+// @Test
+// public void testChooseTargetForHarRaidFile() throws IOException {
+// setupCluster();
+// try {
+// String[] racks = {"/rack2", "/rack2", "/rack2",
+// "/rack2", "/rack2", "/rack2"};
+// String[] hosts =
+// {"host2.rack2.com", "host3.rack2.com", "host4.rack2.com",
+// "host5.rack2.com", "host6.rack2.com", "host7.rack2.com"};
+// cluster.startDataNodes(conf, 6, true, null, racks, hosts, null);
+// String harParity = raidrsHarTempPrefix + "/dir/file";
+// int numBlocks = 11;
+// DFSTestUtil.createFile(fs, new Path(harParity), numBlocks, (short)1, 0L);
+// DFSTestUtil.waitReplication(fs, new Path(harParity), (short)1);
+// FileStatus stat = fs.getFileStatus(new Path(harParity));
+// BlockLocation[] loc = fs.getFileBlockLocations(stat, 0, stat.getLen());
+// int rsParityLength = RaidNode.rsParityLength(conf);
+// for (int i = 0; i < numBlocks - rsParityLength; i++) {
+// Set<String> locations = new HashSet<String>();
+// for (int j = 0; j < rsParityLength; j++) {
+// for (int k = 0; k < loc[i + j].getNames().length; k++) {
+// // verify that every adjacent 4 blocks are on differnt nodes
+// String name = loc[i + j].getNames()[k];
+// LOG.info("Har Raid block location: " + name);
+// Assert.assertTrue(locations.add(name));
+// }
+// }
+// }
+// } finally {
+// if (cluster != null) {
+// cluster.shutdown();
+// }
+// }
+// }
+//
+// /**
+// * Test BlockPlacementPolicyRaid.CachedLocatedBlocks
+// * Verify that the results obtained from cache is the same as
+// * the results obtained directly
+// */
+// @Test
+// public void testCachedBlocks() throws IOException {
+// setupCluster();
+// try {
+// String file1 = "/dir/file1";
+// String file2 = "/dir/file2";
+// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+// // test blocks cache
+// CachedLocatedBlocks cachedBlocks = new CachedLocatedBlocks(namesystem);
+// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+// try {
+// Thread.sleep(1200L);
+// } catch (InterruptedException e) {
+// }
+// verifyCachedBlocksResult(cachedBlocks, namesystem, file2);
+// verifyCachedBlocksResult(cachedBlocks, namesystem, file1);
+// } finally {
+// if (cluster != null) {
+// cluster.shutdown();
+// }
+// }
+// }
+//
+// /**
+// * Test BlockPlacementPolicyRaid.CachedFullPathNames
+// * Verify that the results obtained from cache is the same as
+// * the results obtained directly
+// */
+// @Test
+// public void testCachedPathNames() throws IOException {
+// setupCluster();
+// try {
+// String file1 = "/dir/file1";
+// String file2 = "/dir/file2";
+// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+// // test full path cache
+// CachedFullPathNames cachedFullPathNames =
+// new CachedFullPathNames(namesystem);
+// FSInodeInfo inode1 = null;
+// FSInodeInfo inode2 = null;
+// NameNodeRaidTestUtil.readLock(namesystem.dir);
+// try {
+// inode1 = NameNodeRaidTestUtil.getNode(namesystem.dir, file1, true);
+// inode2 = NameNodeRaidTestUtil.getNode(namesystem.dir, file2, true);
+// } finally {
+// NameNodeRaidTestUtil.readUnLock(namesystem.dir);
+// }
+// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+// try {
+// Thread.sleep(1200L);
+// } catch (InterruptedException e) {
+// }
+// verifyCachedFullPathNameResult(cachedFullPathNames, inode2);
+// verifyCachedFullPathNameResult(cachedFullPathNames, inode1);
+// } finally {
+// if (cluster != null) {
+// cluster.shutdown();
+// }
+// }
+// }
+// /**
+// * Test the result of getCompanionBlocks() on the unraided files
+// */
+// @Test
+// public void testGetCompanionBLocks() throws IOException {
+// setupCluster();
+// try {
+// String file1 = "/dir/file1";
+// String file2 = "/raid/dir/file2";
+// String file3 = "/raidrs/dir/file3";
+// // Set the policy to default policy to place the block in the default way
+// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+// conf, namesystem, namesystem.clusterMap));
+// DFSTestUtil.createFile(fs, new Path(file1), 3, (short)1, 0L);
+// DFSTestUtil.createFile(fs, new Path(file2), 4, (short)1, 0L);
+// DFSTestUtil.createFile(fs, new Path(file3), 8, (short)1, 0L);
+// Collection<LocatedBlock> companionBlocks;
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, getBlocks(namesystem, file1).get(0).getBlock());
+// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, getBlocks(namesystem, file1).get(2).getBlock());
+// Assert.assertTrue(companionBlocks == null || companionBlocks.size() == 0);
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, getBlocks(namesystem, file2).get(0).getBlock());
+// Assert.assertEquals(1, companionBlocks.size());
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, getBlocks(namesystem, file2).get(3).getBlock());
+// Assert.assertEquals(1, companionBlocks.size());
+//
+// int rsParityLength = RaidNode.rsParityLength(conf);
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, getBlocks(namesystem, file3).get(0).getBlock());
+// Assert.assertEquals(rsParityLength, companionBlocks.size());
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, getBlocks(namesystem, file3).get(4).getBlock());
+// Assert.assertEquals(rsParityLength, companionBlocks.size());
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, getBlocks(namesystem, file3).get(6).getBlock());
+// Assert.assertEquals(2, companionBlocks.size());
+// } finally {
+// if (cluster != null) {
+// cluster.shutdown();
+// }
+// }
+// }
+//
+// static void setBlockPlacementPolicy(
+// FSNamesystem namesystem, BlockPlacementPolicy policy) {
+// namesystem.writeLock();
+// try {
+// namesystem.blockManager.replicator = policy;
+// } finally {
+// namesystem.writeUnlock();
+// }
+// }
+//
+// /**
+// * Test BlockPlacementPolicyRaid actually deletes the correct replica.
+// * Start 2 datanodes and create 1 source file and its parity file.
+// * 1) Start host1, create the parity file with replication 1
+// * 2) Start host2, create the source file with replication 2
+// * 3) Set repliation of source file to 1
+// * Verify that the policy should delete the block with more companion blocks.
+// */
+// @Test
+// public void testDeleteReplica() throws IOException {
+// setupCluster();
+// try {
+// // Set the policy to default policy to place the block in the default way
+// setBlockPlacementPolicy(namesystem, new BlockPlacementPolicyDefault(
+// conf, namesystem, namesystem.clusterMap));
+// DatanodeDescriptor datanode1 =
+// NameNodeRaidTestUtil.getDatanodeMap(namesystem).values().iterator().next();
+// String source = "/dir/file";
+// String parity = xorPrefix + source;
+//
+// final Path parityPath = new Path(parity);
+// DFSTestUtil.createFile(fs, parityPath, 3, (short)1, 0L);
+// DFSTestUtil.waitReplication(fs, parityPath, (short)1);
+//
+// // start one more datanode
+// cluster.startDataNodes(conf, 1, true, null, rack2, host2, null);
+// DatanodeDescriptor datanode2 = null;
+// for (DatanodeDescriptor d : NameNodeRaidTestUtil.getDatanodeMap(namesystem).values()) {
+// if (!d.getName().equals(datanode1.getName())) {
+// datanode2 = d;
+// }
+// }
+// Assert.assertTrue(datanode2 != null);
+// cluster.waitActive();
+// final Path sourcePath = new Path(source);
+// DFSTestUtil.createFile(fs, sourcePath, 5, (short)2, 0L);
+// DFSTestUtil.waitReplication(fs, sourcePath, (short)2);
+//
+// refreshPolicy();
+// Assert.assertEquals(parity,
+// policy.getParityFile(source));
+// Assert.assertEquals(source,
+// policy.getSourceFile(parity, xorPrefix));
+//
+// List<LocatedBlock> sourceBlocks = getBlocks(namesystem, source);
+// List<LocatedBlock> parityBlocks = getBlocks(namesystem, parity);
+// Assert.assertEquals(5, sourceBlocks.size());
+// Assert.assertEquals(3, parityBlocks.size());
+//
+// // verify the result of getCompanionBlocks()
+// Collection<LocatedBlock> companionBlocks;
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, sourceBlocks.get(0).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{0, 1}, new int[]{0});
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, sourceBlocks.get(1).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{0, 1}, new int[]{0});
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, sourceBlocks.get(2).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{2, 3}, new int[]{1});
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, sourceBlocks.get(3).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{2, 3}, new int[]{1});
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, sourceBlocks.get(4).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{4}, new int[]{2});
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, parityBlocks.get(0).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{0, 1}, new int[]{0});
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, parityBlocks.get(1).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{2, 3}, new int[]{1});
+//
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, parityBlocks.get(2).getBlock());
+// verifyCompanionBlocks(companionBlocks, sourceBlocks, parityBlocks,
+// new int[]{4}, new int[]{2});
+//
+// // Set the policy back to raid policy. We have to create a new object
+// // here to clear the block location cache
+// refreshPolicy();
+// setBlockPlacementPolicy(namesystem, policy);
+// // verify policy deletes the correct blocks. companion blocks should be
+// // evenly distributed.
+// fs.setReplication(sourcePath, (short)1);
+// DFSTestUtil.waitReplication(fs, sourcePath, (short)1);
+// Map<String, Integer> counters = new HashMap<String, Integer>();
+// refreshPolicy();
+// for (int i = 0; i < parityBlocks.size(); i++) {
+// companionBlocks = getCompanionBlocks(
+// namesystem, policy, parityBlocks.get(i).getBlock());
+//
+// counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+// companionBlocks, false);
+// Assert.assertTrue(counters.get(datanode1.getName()) >= 1 &&
+// counters.get(datanode1.getName()) <= 2);
+// Assert.assertTrue(counters.get(datanode1.getName()) +
+// counters.get(datanode2.getName()) ==
+// companionBlocks.size());
+//
+// counters = BlockPlacementPolicyRaid.countCompanionBlocks(
+// companionBlocks, true);
+// Assert.assertTrue(counters.get(datanode1.getParent().getName()) >= 1 &&
+// counters.get(datanode1.getParent().getName()) <= 2);
+// Assert.assertTrue(counters.get(datanode1.getParent().getName()) +
+// counters.get(datanode2.getParent().getName()) ==
+// companionBlocks.size());
+// }
+// } finally {
+// if (cluster != null) {
+// cluster.shutdown();
+// }
+// }
+// }
+//
+// // create a new BlockPlacementPolicyRaid to clear the cache
+// private void refreshPolicy() {
+// policy = new BlockPlacementPolicyRaid();
+// policy.initialize(conf, namesystem, namesystem.clusterMap);
+// }
+//
+// private void verifyCompanionBlocks(Collection<LocatedBlock> companionBlocks,
+// List<LocatedBlock> sourceBlocks, List<LocatedBlock> parityBlocks,
+// int[] sourceBlockIndexes, int[] parityBlockIndexes) {
+// Set<ExtendedBlock> blockSet = new HashSet<ExtendedBlock>();
+// for (LocatedBlock b : companionBlocks) {
+// blockSet.add(b.getBlock());
+// }
+// Assert.assertEquals(sourceBlockIndexes.length + parityBlockIndexes.length,
+// blockSet.size());
+// for (int index : sourceBlockIndexes) {
+// Assert.assertTrue(blockSet.contains(sourceBlocks.get(index).getBlock()));
+// }
+// for (int index : parityBlockIndexes) {
+// Assert.assertTrue(blockSet.contains(parityBlocks.get(index).getBlock()));
+// }
+// }
+//
+// private void verifyCachedFullPathNameResult(
+// CachedFullPathNames cachedFullPathNames, FSInodeInfo inode)
+// throws IOException {
+// String res1 = inode.getFullPathName();
+// String res2 = cachedFullPathNames.get(inode);
+// LOG.info("Actual path name: " + res1);
+// LOG.info("Cached path name: " + res2);
+// Assert.assertEquals(cachedFullPathNames.get(inode),
+// inode.getFullPathName());
+// }
+//
+// private void verifyCachedBlocksResult(CachedLocatedBlocks cachedBlocks,
+// FSNamesystem namesystem, String file) throws IOException{
+// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
+// List<LocatedBlock> res1 = NameNodeRaidUtil.getBlockLocations(namesystem,
+// file, 0L, len, false, false).getLocatedBlocks();
+// List<LocatedBlock> res2 = cachedBlocks.get(file);
+// for (int i = 0; i < res1.size(); i++) {
+// LOG.info("Actual block: " + res1.get(i).getBlock());
+// LOG.info("Cached block: " + res2.get(i).getBlock());
+// Assert.assertEquals(res1.get(i).getBlock(), res2.get(i).getBlock());
+// }
+// }
+//
+// private Collection<LocatedBlock> getCompanionBlocks(
+// FSNamesystem namesystem, BlockPlacementPolicyRaid policy,
+// ExtendedBlock block) throws IOException {
+// INodeFile inode = namesystem.blockManager.blocksMap.getINode(block
+// .getLocalBlock());
+// FileType type = policy.getFileType(inode.getFullPathName());
+// return policy.getCompanionBlocks(inode.getFullPathName(), type,
+// block.getLocalBlock());
+// }
+//
+// private List<LocatedBlock> getBlocks(FSNamesystem namesystem, String file)
+// throws IOException {
+// long len = NameNodeRaidUtil.getFileInfo(namesystem, file, true).getLen();
+// return NameNodeRaidUtil.getBlockLocations(namesystem,
+// file, 0, len, false, false).getLocatedBlocks();
+// }
+//}