You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/11/13 01:31:56 UTC
svn commit: r1541338 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/fs/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/web/
src/test/java/org/apache/hadoop/hdfs/server/datanode/...
Author: wang
Date: Wed Nov 13 00:31:55 2013
New Revision: 1541338
URL: http://svn.apache.org/r1541338
Log:
HDFS-5450. better API for getting the cached blocks locations. Contributed by Andrew Wang.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1541338&r1=1541337&r2=1541338&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Wed Nov 13 00:31:55 2013
@@ -190,6 +190,8 @@ Trunk (Unreleased)
HDFS-5326. add modifyDirective to cacheAdmin. (cmccabe)
+ HDFS-5450. Better API for getting the cached blocks locations. (wang)
+
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java?rev=1541338&r1=1541337&r2=1541338&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/HdfsBlockLocation.java Wed Nov 13 00:31:55 2013
@@ -37,8 +37,7 @@ public class HdfsBlockLocation extends B
public HdfsBlockLocation(BlockLocation loc, LocatedBlock block)
throws IOException {
// Initialize with data from passed in BlockLocation
- super(loc.getNames(), loc.getHosts(), loc.getTopologyPaths(),
- loc.getOffset(), loc.getLength(), loc.isCorrupt());
+ super(loc);
this.block = block;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1541338&r1=1541337&r2=1541338&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Nov 13 00:31:55 2013
@@ -419,7 +419,13 @@ public class DFSUtil {
locations[hCnt].getNetworkLocation());
racks[hCnt] = node.toString();
}
- blkLocations[idx] = new BlockLocation(xferAddrs, hosts, racks,
+ DatanodeInfo[] cachedLocations = blk.getCachedLocations();
+ String[] cachedHosts = new String[cachedLocations.length];
+ for (int i=0; i<cachedLocations.length; i++) {
+ cachedHosts[i] = cachedLocations[i].getHostName();
+ }
+ blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
+ racks,
blk.getStartOffset(),
blk.getBlockSize(),
blk.isCorrupt());
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1541338&r1=1541337&r2=1541338&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java Wed Nov 13 00:31:55 2013
@@ -393,6 +393,7 @@ public class JsonUtil {
m.put("startOffset", locatedblock.getStartOffset());
m.put("block", toJsonMap(locatedblock.getBlock()));
m.put("locations", toJsonArray(locatedblock.getLocations()));
+ m.put("cachedLocations", toJsonArray(locatedblock.getCachedLocations()));
return m;
}
@@ -407,8 +408,11 @@ public class JsonUtil {
(Object[])m.get("locations"));
final long startOffset = (Long)m.get("startOffset");
final boolean isCorrupt = (Boolean)m.get("isCorrupt");
+ final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
+ (Object[])m.get("cachedLocations"));
- final LocatedBlock locatedblock = new LocatedBlock(b, locations, startOffset, isCorrupt);
+ final LocatedBlock locatedblock = new LocatedBlock(b, locations,
+ startOffset, isCorrupt, cachedLocations);
locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
return locatedblock;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java?rev=1541338&r1=1541337&r2=1541338&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java Wed Nov 13 00:31:55 2013
@@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.protocolPB
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetCache.PageRounder;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock.Mlocker;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
@@ -87,6 +88,8 @@ public class TestFsDatasetCache {
private static DatanodeProtocolClientSideTranslatorPB spyNN;
private static PageRounder rounder = new PageRounder();
+ private Mlocker mlocker;
+
@Before
public void setUp() throws Exception {
assumeTrue(!Path.WINDOWS);
@@ -110,6 +113,8 @@ public class TestFsDatasetCache {
fsd = dn.getFSDataset();
spyNN = DataNodeTestUtils.spyOnBposToNN(dn, nn);
+ // Save the current mlocker and replace it at the end of the test
+ mlocker = MappableBlock.mlocker;
}
@After
@@ -120,6 +125,8 @@ public class TestFsDatasetCache {
if (cluster != null) {
cluster.shutdown();
}
+ // Restore the original mlocker
+ MappableBlock.mlocker = mlocker;
}
private static void setHeartbeatResponse(DatanodeCommand[] cmds)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java?rev=1541338&r1=1541337&r2=1541338&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java Wed Nov 13 00:31:55 2013
@@ -31,6 +31,7 @@ import static org.junit.Assert.assertTru
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.nio.MappedByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Iterator;
@@ -40,6 +41,8 @@ import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.InvalidRequestException;
@@ -54,6 +57,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.MappableBlock;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.security.AccessControlException;
@@ -78,6 +82,15 @@ public class TestPathBasedCacheRequests
static private DistributedFileSystem dfs;
static private NamenodeProtocols proto;
+ static {
+ MappableBlock.mlocker = new MappableBlock.Mlocker() {
+ @Override
+ public void mlock(MappedByteBuffer mmap, long length) throws IOException {
+ // Stubbed out for testing
+ }
+ };
+ }
+
@Before
public void setup() throws Exception {
conf = new HdfsConfiguration();
@@ -530,6 +543,14 @@ public class TestPathBasedCacheRequests
assertFalse("Unexpected # of cache directives found", dit.hasNext());
}
+ /**
+ * Wait for the NameNode to have an expected number of cached blocks
+ * and replicas.
+ * @param nn NameNode
+ * @param expectedCachedBlocks
+ * @param expectedCachedReplicas
+ * @throws Exception
+ */
private static void waitForCachedBlocks(NameNode nn,
final int expectedCachedBlocks, final int expectedCachedReplicas)
throws Exception {
@@ -570,6 +591,37 @@ public class TestPathBasedCacheRequests
}, 500, 60000);
}
+ private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
+ final List<Path> paths, final int expectedBlocks,
+ final int expectedReplicas)
+ throws Exception {
+ int numCachedBlocks = 0;
+ int numCachedReplicas = 0;
+ for (Path p: paths) {
+ final FileStatus f = dfs.getFileStatus(p);
+ final long len = f.getLen();
+ final long blockSize = f.getBlockSize();
+ // round it up to full blocks
+ final long numBlocks = (len + blockSize - 1) / blockSize;
+ BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
+ assertEquals("Unexpected number of block locations for path " + p,
+ numBlocks, locs.length);
+ for (BlockLocation l: locs) {
+ if (l.getCachedHosts().length > 0) {
+ numCachedBlocks++;
+ }
+ numCachedReplicas += l.getCachedHosts().length;
+ }
+ }
+ LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
+ LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
+ + " replicas");
+ assertEquals("Unexpected number of cached blocks", expectedBlocks,
+ numCachedBlocks);
+ assertEquals("Unexpected number of cached replicas", expectedReplicas,
+ numCachedReplicas);
+ }
+
private static final long BLOCK_SIZE = 512;
private static final int NUM_DATANODES = 4;
@@ -746,6 +798,78 @@ public class TestPathBasedCacheRequests
}
}
+ /**
+ * Tests stepping the cache replication factor up and down, checking the
+ * number of cached replicas and blocks as well as the advertised locations.
+ * @throws Exception
+ */
+ @Test(timeout=120000)
+ public void testReplicationFactor() throws Exception {
+ Assume.assumeTrue(canTestDatanodeCaching());
+ HdfsConfiguration conf = createCachingConf();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ // Create the pool
+ final String pool = "friendlyPool";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final List<Path> paths = new LinkedList<Path>();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
+ }
+ waitForCachedBlocks(namenode, 0, 0);
+ checkNumCachedReplicas(dfs, paths, 0, 0);
+ // cache directory
+ long id = dfs.addPathBasedCacheDirective(
+ new PathBasedCacheDirective.Builder().
+ setPath(new Path("/foo")).
+ setReplication((short)1).
+ setPool(pool).
+ build());
+ waitForCachedBlocks(namenode, 4, 4);
+ checkNumCachedReplicas(dfs, paths, 4, 4);
+ // step up the replication factor
+ for (int i=2; i<=3; i++) {
+ dfs.modifyPathBasedCacheDirective(
+ new PathBasedCacheDirective.Builder().
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i);
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
+ }
+ // step it down
+ for (int i=2; i>=1; i--) {
+ dfs.modifyPathBasedCacheDirective(
+ new PathBasedCacheDirective.Builder().
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i);
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
+ }
+ // remove and watch numCached go to 0
+ dfs.removePathBasedCacheDirective(id);
+ waitForCachedBlocks(namenode, 0, 0);
+ checkNumCachedReplicas(dfs, paths, 0, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
@Test(timeout=60000)
public void testListCachePoolPermissions() throws Exception {
final UserGroupInformation myUser = UserGroupInformation