You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/10/17 00:15:34 UTC
svn commit: r1532924 [2/2] - in
/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/
src/main/java/org/apache/hadoop/hdfs/server/datanod...
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Wed Oct 16 22:15:33 2013
@@ -17,53 +17,97 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
+import java.io.Closeable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
-import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
+import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.GSet;
+import org.apache.hadoop.util.LightWeightGSet;
+import org.apache.hadoop.util.Time;
-import com.google.common.base.Preconditions;
+import com.google.common.annotations.VisibleForTesting;
/**
* The Cache Manager handles caching on DataNodes.
+ *
+ * This class is instantiated by the FSNamesystem when caching is enabled.
+ * It maintains the mapping of cached blocks to datanodes via processing
+ * datanode cache reports. Based on these reports and addition and removal of
+ * caching directives, we will schedule caching and uncaching work.
*/
+@InterfaceAudience.LimitedPrivate({"HDFS"})
public final class CacheManager {
public static final Log LOG = LogFactory.getLog(CacheManager.class);
+ // TODO: add pending / underCached / schedule cached blocks stats.
+
+ /**
+ * The FSNamesystem that contains this CacheManager.
+ */
+ private final FSNamesystem namesystem;
+
+ /**
+ * The BlockManager associated with the FSN that owns this CacheManager.
+ */
+ private final BlockManager blockManager;
+
/**
* Cache entries, sorted by ID.
*
@@ -74,6 +118,12 @@ public final class CacheManager {
new TreeMap<Long, PathBasedCacheEntry>();
/**
+ * The entry ID to use for a new entry. Entry IDs always increase, and are
+ * never reused.
+ */
+ private long nextEntryId;
+
+ /**
* Cache entries, sorted by path
*/
private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath =
@@ -86,11 +136,6 @@ public final class CacheManager {
new TreeMap<String, CachePool>();
/**
- * The entry ID to use for a new entry.
- */
- private long nextEntryId;
-
- /**
* Maximum number of cache pools to list in one operation.
*/
private final int maxListCachePoolsResponses;
@@ -100,44 +145,129 @@ public final class CacheManager {
*/
private final int maxListCacheDescriptorsResponses;
- final private FSNamesystem namesystem;
- final private FSDirectory dir;
+ /**
+ * Interval between scans in milliseconds.
+ */
+ private final long scanIntervalMs;
- CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) {
- clear();
+ /**
+ * Whether caching is enabled.
+ *
+ * If caching is disabled, we will not process cache reports or store
+ * information about what is cached where. We also do not start the
+ * CacheReplicationMonitor thread. This will save resources, but provide
+ * less functionality.
+ *
+ * Even when caching is disabled, we still store path-based cache
+ * information. This information is stored in the edit log and fsimage. We
+ * don't want to lose it just because a configuration setting was turned off.
+ * However, we will not act on this information if caching is disabled.
+ */
+ private final boolean enabled;
+
+ /**
+ * Whether the CacheManager is active.
+ *
+ * When the CacheManager is active, it tells the DataNodes what to cache
+ * and uncache. The CacheManager cannot become active if enabled = false.
+ */
+ private boolean active = false;
+
+ /**
+ * All cached blocks.
+ */
+ private final GSet<CachedBlock, CachedBlock> cachedBlocks;
+
+ /**
+ * The CacheReplicationMonitor.
+ */
+ private CacheReplicationMonitor monitor;
+
+ CacheManager(FSNamesystem namesystem, Configuration conf,
+ BlockManager blockManager) {
this.namesystem = namesystem;
- this.dir = dir;
- maxListCachePoolsResponses = conf.getInt(
+ this.blockManager = blockManager;
+ this.nextEntryId = 1;
+ this.maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
- maxListCacheDescriptorsResponses = conf.getInt(
+ this.maxListCacheDescriptorsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT);
+ scanIntervalMs = conf.getLong(
+ DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS,
+ DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT);
+ this.enabled = conf.getBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY,
+ DFS_NAMENODE_CACHING_ENABLED_DEFAULT);
+ this.cachedBlocks = !enabled ? null :
+ new LightWeightGSet<CachedBlock, CachedBlock>(
+ LightWeightGSet.computeCapacity(0.25, "cachedBlocks"));
}
- synchronized void clear() {
- entriesById.clear();
- entriesByPath.clear();
- cachePools.clear();
- nextEntryId = 1;
+ /**
+ * Activate the cache manager.
+ *
+ * When the cache manager is active, tell the datanodes where to cache files.
+ */
+ public void activate() {
+ assert namesystem.hasWriteLock();
+ if (enabled && (!active)) {
+ LOG.info("Activating CacheManager. " +
+ "Starting replication monitor thread...");
+ active = true;
+ monitor = new CacheReplicationMonitor(namesystem, this,
+ scanIntervalMs);
+ monitor.start();
+ }
}
/**
- * Returns the next entry ID to be used for a PathBasedCacheEntry
+ * Deactivate the cache manager.
+ *
+ * When the cache manager is inactive, it does not tell the datanodes where to
+ * cache files.
*/
- synchronized long getNextEntryId() {
- Preconditions.checkArgument(nextEntryId != Long.MAX_VALUE);
- return nextEntryId++;
+ public void deactivate() {
+ assert namesystem.hasWriteLock();
+ if (active) {
+ LOG.info("Deactivating CacheManager. " +
+ "stopping CacheReplicationMonitor thread...");
+ active = false;
+ IOUtils.closeQuietly(monitor);
+ monitor = null;
+ LOG.info("CacheReplicationMonitor thread stopped and deactivated.");
+ }
}
/**
- * Returns the PathBasedCacheEntry corresponding to a PathBasedCacheEntry.
- *
- * @param directive Lookup directive
- * @return Corresponding PathBasedCacheEntry, or null if not present.
+ * Return true only if the cache manager is active.
+ * Must be called under the FSN read or write lock.
*/
- private synchronized PathBasedCacheEntry
- findEntry(PathBasedCacheDirective directive) {
+ public boolean isActive() {
+ return active;
+ }
+
+ public TreeMap<Long, PathBasedCacheEntry> getEntriesById() {
+ assert namesystem.hasReadOrWriteLock();
+ return entriesById;
+ }
+
+ @VisibleForTesting
+ public GSet<CachedBlock, CachedBlock> getCachedBlocks() {
+ assert namesystem.hasReadOrWriteLock();
+ return cachedBlocks;
+ }
+
+ private long getNextEntryId() throws IOException {
+ assert namesystem.hasWriteLock();
+ if (nextEntryId == Long.MAX_VALUE) {
+ throw new IOException("No more available IDs");
+ }
+ return nextEntryId++;
+ }
+
+ private PathBasedCacheEntry findEntry(PathBasedCacheDirective directive) {
+ assert namesystem.hasReadOrWriteLock();
List<PathBasedCacheEntry> existing =
entriesByPath.get(directive.getPath().toUri().getPath());
if (existing == null) {
@@ -151,56 +281,10 @@ public final class CacheManager {
return null;
}
- /**
- * Add a new PathBasedCacheEntry, skipping any validation checks. Called
- * directly when reloading CacheManager state from FSImage.
- *
- * @throws IOException if unable to cache the entry
- */
- private void unprotectedAddEntry(PathBasedCacheEntry entry)
- throws IOException {
- assert namesystem.hasWriteLock();
- // Add it to the various maps
- entriesById.put(entry.getEntryId(), entry);
- String path = entry.getPath();
- List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
- if (entryList == null) {
- entryList = new ArrayList<PathBasedCacheEntry>(1);
- entriesByPath.put(path, entryList);
- }
- entryList.add(entry);
- // Set the path as cached in the namesystem
- try {
- INode node = dir.getINode(entry.getPath());
- if (node != null && node.isFile()) {
- INodeFile file = node.asFile();
- // TODO: adjustable cache replication factor
- namesystem.setCacheReplicationInt(entry.getPath(),
- file.getBlockReplication());
- } else {
- LOG.warn("Path " + entry.getPath() + " is not a file");
- }
- } catch (IOException ioe) {
- LOG.info("unprotectedAddEntry " + entry +": failed to cache file: " +
- ioe.getClass().getName() +": " + ioe.getMessage());
- throw ioe;
- }
- }
-
- /**
- * Add a new PathBasedCacheDirective if valid, returning a corresponding
- * PathBasedCacheDescriptor to the user.
- *
- * @param directive Directive describing the cache entry being added
- * @param pc Permission checker used to validate that the calling user has
- * access to the destination cache pool
- * @return Corresponding PathBasedCacheDescriptor for the new cache entry
- * @throws IOException if the directive is invalid or was otherwise
- * unsuccessful
- */
- public synchronized PathBasedCacheDescriptor addDirective(
+ public PathBasedCacheDescriptor addDirective(
PathBasedCacheDirective directive, FSPermissionChecker pc)
throws IOException {
+ assert namesystem.hasWriteLock();
CachePool pool = cachePools.get(directive.getPool());
if (pool == null) {
LOG.info("addDirective " + directive + ": pool not found.");
@@ -225,47 +309,37 @@ public final class CacheManager {
"existing directive " + existing + " in this pool.");
return existing.getDescriptor();
}
-
- // Success!
- PathBasedCacheDescriptor d = unprotectedAddDirective(directive);
- LOG.info("addDirective " + directive + ": added cache directive "
- + directive);
- return d;
- }
-
- /**
- * Assigns a new entry ID to a validated PathBasedCacheDirective and adds
- * it to the CacheManager. Called directly when replaying the edit log.
- *
- * @param directive Directive being added
- * @return PathBasedCacheDescriptor for the directive
- * @throws IOException
- */
- PathBasedCacheDescriptor unprotectedAddDirective(
- PathBasedCacheDirective directive) throws IOException {
- assert namesystem.hasWriteLock();
- CachePool pool = cachePools.get(directive.getPool());
// Add a new entry with the next available ID.
PathBasedCacheEntry entry;
- entry = new PathBasedCacheEntry(getNextEntryId(),
- directive.getPath().toUri().getPath(),
- directive.getReplication(), pool);
-
- unprotectedAddEntry(entry);
+ try {
+ entry = new PathBasedCacheEntry(getNextEntryId(),
+ directive.getPath().toUri().getPath(),
+ directive.getReplication(), pool);
+ } catch (IOException ioe) {
+ throw new UnexpectedAddPathBasedCacheDirectiveException(directive);
+ }
+ LOG.info("addDirective " + directive + ": added cache directive "
+ + directive);
+ // Success!
+ // First, add it to the various maps
+ entriesById.put(entry.getEntryId(), entry);
+ String path = directive.getPath().toUri().getPath();
+ List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
+ if (entryList == null) {
+ entryList = new ArrayList<PathBasedCacheEntry>(1);
+ entriesByPath.put(path, entryList);
+ }
+ entryList.add(entry);
+ if (monitor != null) {
+ monitor.kick();
+ }
return entry.getDescriptor();
}
- /**
- * Remove the PathBasedCacheEntry corresponding to a descriptor ID from
- * the CacheManager.
- *
- * @param id of the PathBasedCacheDescriptor
- * @param pc Permissions checker used to validated the request
- * @throws IOException
- */
- public synchronized void removeDescriptor(long id, FSPermissionChecker pc)
+ public void removeDescriptor(long id, FSPermissionChecker pc)
throws IOException {
+ assert namesystem.hasWriteLock();
// Check for invalid IDs.
if (id <= 0) {
LOG.info("removeDescriptor " + id + ": invalid non-positive " +
@@ -290,20 +364,6 @@ public final class CacheManager {
throw new RemovePermissionDeniedException(id);
}
- unprotectedRemoveDescriptor(id);
- }
-
- /**
- * Unchecked internal method used to remove a PathBasedCacheEntry from the
- * CacheManager. Called directly when replaying the edit log.
- *
- * @param id of the PathBasedCacheDescriptor corresponding to the entry that
- * is being removed
- * @throws IOException
- */
- void unprotectedRemoveDescriptor(long id) throws IOException {
- assert namesystem.hasWriteLock();
- PathBasedCacheEntry existing = entriesById.get(id);
// Remove the corresponding entry in entriesByPath.
String path = existing.getDescriptor().getPath().toUri().getPath();
List<PathBasedCacheEntry> entries = entriesByPath.get(path);
@@ -314,26 +374,16 @@ public final class CacheManager {
entriesByPath.remove(path);
}
entriesById.remove(id);
-
- // Set the path as uncached in the namesystem
- try {
- INode node = dir.getINode(existing.getDescriptor().getPath().toUri().
- getPath());
- if (node != null && node.isFile()) {
- namesystem.setCacheReplicationInt(existing.getDescriptor().getPath().
- toUri().getPath(), (short) 0);
- }
- } catch (IOException e) {
- LOG.warn("removeDescriptor " + id + ": failure while setting cache"
- + " replication factor", e);
- throw e;
+ if (monitor != null) {
+ monitor.kick();
}
LOG.info("removeDescriptor successful for PathCacheEntry id " + id);
}
- public synchronized BatchedListEntries<PathBasedCacheDescriptor>
+ public BatchedListEntries<PathBasedCacheDescriptor>
listPathBasedCacheDescriptors(long prevId, String filterPool,
String filterPath, FSPermissionChecker pc) throws IOException {
+ assert namesystem.hasReadOrWriteLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
if (filterPath != null) {
if (!DFSUtil.isValidName(filterPath)) {
@@ -370,12 +420,13 @@ public final class CacheManager {
* Create a cache pool.
*
* Only the superuser should be able to call this function.
- *
- * @param info The info for the cache pool to create.
- * @return the created CachePool
+ *
+ * @param info The info for the cache pool to create.
+ * @return Information about the cache pool we created.
*/
- public synchronized CachePoolInfo addCachePool(CachePoolInfo info)
+ public CachePoolInfo addCachePool(CachePoolInfo info)
throws IOException {
+ assert namesystem.hasWriteLock();
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
@@ -384,20 +435,8 @@ public final class CacheManager {
}
pool = CachePool.createFromInfoAndDefaults(info);
cachePools.put(pool.getPoolName(), pool);
- return pool.getInfo(true);
- }
-
- /**
- * Internal unchecked method used to add a CachePool. Called directly when
- * reloading CacheManager state from the FSImage or edit log.
- *
- * @param pool to be added
- */
- void unprotectedAddCachePool(CachePoolInfo info) {
- assert namesystem.hasWriteLock();
- CachePool pool = CachePool.createFromInfo(info);
- cachePools.put(pool.getPoolName(), pool);
LOG.info("created new cache pool " + pool);
+ return pool.getInfo(true);
}
/**
@@ -408,8 +447,9 @@ public final class CacheManager {
* @param info
* The info for the cache pool to modify.
*/
- public synchronized void modifyCachePool(CachePoolInfo info)
+ public void modifyCachePool(CachePoolInfo info)
throws IOException {
+ assert namesystem.hasWriteLock();
CachePoolInfo.validate(info);
String poolName = info.getPoolName();
CachePool pool = cachePools.get(poolName);
@@ -455,8 +495,9 @@ public final class CacheManager {
* @param poolName
* The name for the cache pool to remove.
*/
- public synchronized void removeCachePool(String poolName)
+ public void removeCachePool(String poolName)
throws IOException {
+ assert namesystem.hasWriteLock();
CachePoolInfo.validateName(poolName);
CachePool pool = cachePools.remove(poolName);
if (pool == null) {
@@ -475,10 +516,14 @@ public final class CacheManager {
iter.remove();
}
}
+ if (monitor != null) {
+ monitor.kick();
+ }
}
- public synchronized BatchedListEntries<CachePoolInfo>
+ public BatchedListEntries<CachePoolInfo>
listCachePools(FSPermissionChecker pc, String prevKey) {
+ assert namesystem.hasReadOrWriteLock();
final int NUM_PRE_ALLOCATED_ENTRIES = 16;
ArrayList<CachePoolInfo> results =
new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
@@ -497,9 +542,104 @@ public final class CacheManager {
return new BatchedListEntries<CachePoolInfo>(results, false);
}
- /*
- * FSImage related serialization and deserialization code
- */
+ public void setCachedLocations(LocatedBlock block) {
+ if (!enabled) {
+ return;
+ }
+ CachedBlock cachedBlock =
+ new CachedBlock(block.getBlock().getBlockId(),
+ (short)0, false);
+ cachedBlock = cachedBlocks.get(cachedBlock);
+ if (cachedBlock == null) {
+ return;
+ }
+ List<DatanodeDescriptor> datanodes = cachedBlock.getDatanodes(Type.CACHED);
+ for (DatanodeDescriptor datanode : datanodes) {
+ block.addCachedLoc(datanode);
+ }
+ }
+
+ public final void processCacheReport(final DatanodeID datanodeID,
+ final BlockListAsLongs report) throws IOException {
+ if (!enabled) {
+ LOG.info("Ignoring cache report from " + datanodeID +
+ " because " + DFS_NAMENODE_CACHING_ENABLED_KEY + " = false. " +
+ "number of blocks: " + report.getNumberOfBlocks());
+ return;
+ }
+ namesystem.writeLock();
+ final long startTime = Time.monotonicNow();
+ final long endTime;
+ try {
+ final DatanodeDescriptor datanode =
+ blockManager.getDatanodeManager().getDatanode(datanodeID);
+ if (datanode == null || !datanode.isAlive) {
+ throw new IOException(
+ "processCacheReport from dead or unregistered datanode: " + datanode);
+ }
+ processCacheReportImpl(datanode, report);
+ } finally {
+ endTime = Time.monotonicNow();
+ namesystem.writeUnlock();
+ }
+
+ // Log the block report processing stats from Namenode perspective
+ final NameNodeMetrics metrics = NameNode.getNameNodeMetrics();
+ if (metrics != null) {
+ metrics.addCacheBlockReport((int) (endTime - startTime));
+ }
+ LOG.info("Processed cache report from "
+ + datanodeID + ", blocks: " + report.getNumberOfBlocks()
+ + ", processing time: " + (endTime - startTime) + " msecs");
+ }
+
+ private void processCacheReportImpl(final DatanodeDescriptor datanode,
+ final BlockListAsLongs report) {
+ CachedBlocksList cached = datanode.getCached();
+ cached.clear();
+ BlockReportIterator itBR = report.getBlockReportIterator();
+ while (itBR.hasNext()) {
+ Block block = itBR.next();
+ ReplicaState iState = itBR.getCurrentReplicaState();
+ if (iState != ReplicaState.FINALIZED) {
+ LOG.error("Cached block report contained unfinalized block " + block);
+ continue;
+ }
+ BlockInfo blockInfo = blockManager.getStoredBlock(block);
+ if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
+ // The NameNode will eventually remove or update the out-of-date block.
+ // Until then, we pretend that it isn't cached.
+ LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
+ block + ": expected genstamp " + blockInfo.getGenerationStamp());
+ continue;
+ }
+ Collection<DatanodeDescriptor> corruptReplicas =
+ blockManager.getCorruptReplicas(blockInfo);
+ if ((corruptReplicas != null) && corruptReplicas.contains(datanode)) {
+ // The NameNode will eventually remove or update the corrupt block.
+ // Until then, we pretend that it isn't cached.
+ LOG.warn("Ignoring cached replica on " + datanode + " of " + block +
+ " because it is corrupt.");
+ continue;
+ }
+ CachedBlock cachedBlock =
+ new CachedBlock(block.getBlockId(), (short)0, false);
+ CachedBlock prevCachedBlock = cachedBlocks.get(cachedBlock);
+ // Use the existing CachedBlock if it's present; otherwise,
+ // insert a new one.
+ if (prevCachedBlock != null) {
+ cachedBlock = prevCachedBlock;
+ } else {
+ cachedBlocks.put(cachedBlock);
+ }
+ if (!cachedBlock.isPresent(datanode.getCached())) {
+ datanode.getCached().add(cachedBlock);
+ }
+ if (cachedBlock.isPresent(datanode.getPendingCached())) {
+ datanode.getPendingCached().remove(cachedBlock);
+ }
+ }
+ }
/**
* Saves the current state of the CacheManager to the DataOutput. Used
@@ -508,7 +648,7 @@ public final class CacheManager {
* @param sdPath path of the storage directory
* @throws IOException
*/
- public synchronized void saveState(DataOutput out, String sdPath)
+ public void saveState(DataOutput out, String sdPath)
throws IOException {
out.writeLong(nextEntryId);
savePools(out, sdPath);
@@ -521,7 +661,8 @@ public final class CacheManager {
* @param in DataInput from which to restore state
* @throws IOException
*/
- public synchronized void loadState(DataInput in) throws IOException {
+ public void loadState(DataInput in) throws IOException {
+ assert namesystem.hasWriteLock();
nextEntryId = in.readLong();
// pools need to be loaded first since entries point to their parent pool
loadPools(in);
@@ -531,7 +672,7 @@ public final class CacheManager {
/**
* Save cache pools to fsimage
*/
- private synchronized void savePools(DataOutput out,
+ private void savePools(DataOutput out,
String sdPath) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS, sdPath);
@@ -549,7 +690,7 @@ public final class CacheManager {
/*
* Save cache entries to fsimage
*/
- private synchronized void saveEntries(DataOutput out, String sdPath)
+ private void saveEntries(DataOutput out, String sdPath)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
@@ -560,6 +701,7 @@ public final class CacheManager {
for (PathBasedCacheEntry entry: entriesById.values()) {
out.writeLong(entry.getEntryId());
Text.writeString(out, entry.getPath());
+ out.writeShort(entry.getReplication());
Text.writeString(out, entry.getPool().getPoolName());
counter.increment();
}
@@ -569,7 +711,7 @@ public final class CacheManager {
/**
* Load cache pools from fsimage
*/
- private synchronized void loadPools(DataInput in)
+ private void loadPools(DataInput in)
throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_POOLS);
@@ -578,8 +720,7 @@ public final class CacheManager {
prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
for (int i = 0; i < numberOfPools; i++) {
- CachePoolInfo info = CachePoolInfo.readFrom(in);
- unprotectedAddCachePool(info);
+ addCachePool(CachePoolInfo.readFrom(in));
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
@@ -588,7 +729,7 @@ public final class CacheManager {
/**
* Load cache entries from the fsimage
*/
- private synchronized void loadEntries(DataInput in) throws IOException {
+ private void loadEntries(DataInput in) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.CACHE_ENTRIES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
@@ -602,12 +743,24 @@ public final class CacheManager {
String poolName = Text.readString(in);
// Get pool reference by looking it up in the map
CachePool pool = cachePools.get(poolName);
+ if (pool != null) {
+ throw new IOException("Entry refers to pool " + poolName +
+ ", which does not exist.");
+ }
PathBasedCacheEntry entry =
- new PathBasedCacheEntry(entryId, path, replication, pool);
- unprotectedAddEntry(entry);
+ new PathBasedCacheEntry(entryId, path, replication, pool);
+ if (entriesById.put(entry.getEntryId(), entry) != null) {
+ throw new IOException("An entry with ID " + entry.getEntryId() +
+ " already exists");
+ }
+ List<PathBasedCacheEntry> entries = entriesByPath.get(entry.getPath());
+ if (entries == null) {
+ entries = new LinkedList<PathBasedCacheEntry>();
+ entriesByPath.put(entry.getPath(), entries);
+ }
+ entries.add(entry);
counter.increment();
}
prog.endStep(Phase.LOADING_FSIMAGE, step);
}
-
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Wed Oct 16 22:15:33 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
@@ -1093,52 +1094,6 @@ public class FSDirectory implements Clos
}
/**
- * Set cache replication for a file
- *
- * @param src file name
- * @param replication new replication
- * @param blockRepls block replications - output parameter
- * @return array of file blocks
- * @throws QuotaExceededException
- * @throws SnapshotAccessControlException
- */
- Block[] setCacheReplication(String src, short replication, short[] blockRepls)
- throws QuotaExceededException, UnresolvedLinkException,
- SnapshotAccessControlException {
- waitForReady();
- writeLock();
- try {
- return unprotectedSetCacheReplication(src, replication, blockRepls);
- } finally {
- writeUnlock();
- }
- }
-
- Block[] unprotectedSetCacheReplication(String src, short replication,
- short[] blockRepls) throws QuotaExceededException,
- UnresolvedLinkException, SnapshotAccessControlException {
- assert hasWriteLock();
-
- final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
- final INode inode = iip.getLastINode();
- if (inode == null || !inode.isFile()) {
- return null;
- }
- INodeFile file = inode.asFile();
- final short oldBR = file.getCacheReplication();
-
- // TODO: Update quotas here as repl goes up or down
- file.setCacheReplication(replication);
- final short newBR = file.getCacheReplication();
-
- if (blockRepls != null) {
- blockRepls[0] = oldBR;
- blockRepls[1] = newBR;
- }
- return file.getBlocks();
- }
-
- /**
* @param path the file path
* @return the block size of the file.
*/
@@ -2638,12 +2593,19 @@ public class FSDirectory implements Clos
int childrenNum = node.isDirectory() ?
node.asDirectory().getChildrenNum(snapshot) : 0;
- return new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
- blocksize, node.getModificationTime(snapshot),
- node.getAccessTime(snapshot), node.getFsPermission(snapshot),
- node.getUserName(snapshot), node.getGroupName(snapshot),
- node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
- node.getId(), loc, childrenNum);
+ HdfsLocatedFileStatus status =
+ new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
+ blocksize, node.getModificationTime(snapshot),
+ node.getAccessTime(snapshot), node.getFsPermission(snapshot),
+ node.getUserName(snapshot), node.getGroupName(snapshot),
+ node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
+ node.getId(), loc, childrenNum);
+ // Set caching information for the located blocks.
+ CacheManager cacheManager = namesystem.getCacheManager();
+ for (LocatedBlock lb: loc.getLocatedBlocks()) {
+ cacheManager.setCachedLocations(lb);
+ }
+ return status;
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Wed Oct 16 22:15:33 2013
@@ -648,8 +648,7 @@ public class FSEditLogLoader {
setPool(addOp.pool).
build();
PathBasedCacheDescriptor descriptor =
- fsNamesys.getCacheManager().unprotectedAddDirective(d);
-
+ fsNamesys.getCacheManager().addDirective(d, null);
if (toAddRetryCache) {
fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
descriptor);
@@ -659,8 +658,7 @@ public class FSEditLogLoader {
case OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR: {
RemovePathBasedCacheDescriptorOp removeOp =
(RemovePathBasedCacheDescriptorOp) op;
- fsNamesys.getCacheManager().unprotectedRemoveDescriptor(removeOp.id);
-
+ fsNamesys.getCacheManager().removeDescriptor(removeOp.id, null);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
@@ -668,8 +666,7 @@ public class FSEditLogLoader {
}
case OP_ADD_CACHE_POOL: {
AddCachePoolOp addOp = (AddCachePoolOp) op;
- fsNamesys.getCacheManager().unprotectedAddCachePool(addOp.info);
-
+ fsNamesys.getCacheManager().addCachePool(addOp.info);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
@@ -678,7 +675,6 @@ public class FSEditLogLoader {
case OP_MODIFY_CACHE_POOL: {
ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op;
fsNamesys.getCacheManager().modifyCachePool(modifyOp.info);
-
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
@@ -687,7 +683,6 @@ public class FSEditLogLoader {
case OP_REMOVE_CACHE_POOL: {
RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op;
fsNamesys.getCacheManager().removeCachePool(removeOp.poolName);
-
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Wed Oct 16 22:15:33 2013
@@ -374,7 +374,6 @@ public class FSNamesystem implements Nam
private final BlockManager blockManager;
private final SnapshotManager snapshotManager;
private final CacheManager cacheManager;
- private final CacheReplicationManager cacheReplicationManager;
private final DatanodeStatistics datanodeStatistics;
// Block pool ID used by this namenode
@@ -702,9 +701,12 @@ public class FSNamesystem implements Nam
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.snapshotManager = new SnapshotManager(dir);
- this.cacheManager = new CacheManager(this, dir, conf);
- this.cacheReplicationManager = new CacheReplicationManager(this,
- blockManager, blockManager.getDatanodeManager(), this, conf);
+ writeLock();
+ try {
+ this.cacheManager = new CacheManager(this, conf, blockManager);
+ } finally {
+ writeUnlock();
+ }
this.safeMode = new SafeModeInfo(conf);
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -881,7 +883,6 @@ public class FSNamesystem implements Nam
getCompleteBlocksTotal());
setBlockTotal();
blockManager.activate(conf);
- cacheReplicationManager.activate();
} finally {
writeUnlock();
}
@@ -898,7 +899,7 @@ public class FSNamesystem implements Nam
writeLock();
try {
if (blockManager != null) blockManager.close();
- if (cacheReplicationManager != null) cacheReplicationManager.close();
+ cacheManager.deactivate();
} finally {
writeUnlock();
}
@@ -930,8 +931,6 @@ public class FSNamesystem implements Nam
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
- cacheReplicationManager.clearQueues();
-
if (!isInSafeMode() ||
(isInSafeMode() && safeMode.isPopulatingReplQueues())) {
LOG.info("Reprocessing replication and invalidation queues");
@@ -964,6 +963,8 @@ public class FSNamesystem implements Nam
//ResourceMonitor required only at ActiveNN. See HDFS-2914
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start();
+ cacheManager.activate();
+ blockManager.getDatanodeManager().setSendCachingCommands(true);
} finally {
writeUnlock();
}
@@ -998,6 +999,8 @@ public class FSNamesystem implements Nam
// so that the tailer starts from the right spot.
dir.fsImage.updateLastAppliedTxIdFromWritten();
}
+ cacheManager.deactivate();
+ blockManager.getDatanodeManager().setSendCachingCommands(false);
} finally {
writeUnlock();
}
@@ -1442,10 +1445,6 @@ public class FSNamesystem implements Nam
blockManager.getDatanodeManager().sortLocatedBlocks(
clientMachine, lastBlockList);
}
- // Set caching information for the block list
- for (LocatedBlock lb: blocks.getLocatedBlocks()) {
- cacheReplicationManager.setCachedLocations(lb);
- }
}
return blocks;
}
@@ -1553,8 +1552,14 @@ public class FSNamesystem implements Nam
length = Math.min(length, fileSize - offset);
isUc = false;
}
- return blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
+ LocatedBlocks blocks =
+ blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
isUc, offset, length, needBlockToken, iip.isSnapshot());
+ // Set caching information for the located blocks.
+ for (LocatedBlock lb: blocks.getLocatedBlocks()) {
+ cacheManager.setCachedLocations(lb);
+ }
+ return blocks;
} finally {
if (isReadOp) {
readUnlock();
@@ -1928,42 +1933,6 @@ public class FSNamesystem implements Nam
return isFile;
}
- boolean setCacheReplicationInt(String src, final short replication)
- throws IOException {
- final boolean isFile;
- FSPermissionChecker pc = getPermissionChecker();
- checkOperation(OperationCategory.WRITE);
- byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
- writeLock();
- try {
- checkOperation(OperationCategory.WRITE);
- if (isInSafeMode()) {
- throw new SafeModeException("Cannot set replication for " + src, safeMode);
- }
- src = FSDirectory.resolvePath(src, pathComponents, dir);
- if (isPermissionEnabled) {
- checkPathAccess(pc, src, FsAction.WRITE);
- }
-
- final short[] blockRepls = new short[2]; // 0: old, 1: new
- final Block[] blocks = dir.setCacheReplication(src, replication,
- blockRepls);
- isFile = (blocks != null);
- if (isFile) {
- cacheReplicationManager.setCacheReplication(blockRepls[0],
- blockRepls[1], src, blocks);
- }
- } finally {
- writeUnlock();
- }
-
- getEditLog().logSync();
- if (isFile) {
- logAuditEvent(true, "setCacheReplication", src);
- }
- return isFile;
- }
-
long getPreferredBlockSize(String filename)
throws IOException, UnresolvedLinkException {
FSPermissionChecker pc = getPermissionChecker();
@@ -6506,10 +6475,6 @@ public class FSNamesystem implements Nam
public CacheManager getCacheManager() {
return cacheManager;
}
- /** @return the cache replication manager. */
- public CacheReplicationManager getCacheReplicationManager() {
- return cacheReplicationManager;
- }
@Override // NameNodeMXBean
public String getCorruptFiles() {
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Wed Oct 16 22:15:33 2013
@@ -104,8 +104,6 @@ public class INodeFile extends INodeWith
private BlockInfo[] blocks;
- private short cacheReplication = 0;
-
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
BlockInfo[] blklist, short replication, long preferredBlockSize) {
super(id, name, permissions, mtime, atime);
@@ -201,18 +199,6 @@ public class INodeFile extends INodeWith
return nodeToUpdate;
}
- @Override
- public void setCacheReplication(short cacheReplication) {
- Preconditions.checkArgument(cacheReplication <= getBlockReplication(),
- "Cannot set cache replication higher than block replication factor");
- this.cacheReplication = cacheReplication;
- }
-
- @Override
- public short getCacheReplication() {
- return cacheReplication;
- }
-
/** @return preferred block size (in bytes) of the file. */
@Override
public long getPreferredBlockSize() {
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Wed Oct 16 22:15:33 2013
@@ -689,8 +689,13 @@ public class NameNode implements NameNod
try {
initializeGenericKeys(conf, nsId, namenodeId);
initialize(conf);
- state.prepareToEnterState(haContext);
- state.enterState(haContext);
+ try {
+ haContext.writeLock();
+ state.prepareToEnterState(haContext);
+ state.enterState(haContext);
+ } finally {
+ haContext.writeUnlock();
+ }
} catch (IOException e) {
this.stop();
throw e;
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Oct 16 22:15:33 2013
@@ -997,9 +997,7 @@ class NameNodeRpcServer implements Namen
+ "from " + nodeReg + " " + blist.getNumberOfBlocks()
+ " blocks");
}
-
- namesystem.getCacheReplicationManager()
- .processCacheReport(nodeReg, poolId, blist);
+ namesystem.getCacheManager().processCacheReport(nodeReg, blist);
return null;
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Wed Oct 16 22:15:33 2013
@@ -1471,6 +1471,18 @@
</property>
<property>
+ <name>dfs.namenode.path.based.cache.refresh.interval.ms</name>
+ <value>300000</value>
+ <description>
+ The amount of milliseconds between subsequent path cache rescans. Path
+ cache rescans are when we calculate which blocks should be cached, and on
+ what datanodes.
+
+ By default, this parameter is set to 300000, which is five minutes.
+ </description>
+</property>
+
+<property>
<name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
<value>4</value>
<description>
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java?rev=1532924&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCachedBlocksList.java Wed Oct 16 22:15:33 2013
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCachedBlocksList {
+ public static final Log LOG = LogFactory.getLog(TestCachedBlocksList.class);
+
+ @Test(timeout=60000)
+ public void testSingleList() {
+ DatanodeDescriptor dn = new DatanodeDescriptor(
+ new DatanodeID("127.0.0.1", "localhost", "abcd", 5000, 5001, 5002));
+ CachedBlock[] blocks = new CachedBlock[] {
+ new CachedBlock(0L, (short)1, true),
+ new CachedBlock(1L, (short)1, true),
+ new CachedBlock(2L, (short)1, true),
+ };
+ // check that lists are empty
+ Assert.assertTrue("expected pending cached list to start off empty.",
+ !dn.getPendingCached().iterator().hasNext());
+ Assert.assertTrue("expected cached list to start off empty.",
+ !dn.getCached().iterator().hasNext());
+ Assert.assertTrue("expected pending uncached list to start off empty.",
+ !dn.getPendingUncached().iterator().hasNext());
+ // add a block to the back
+ Assert.assertTrue(dn.getCached().add(blocks[0]));
+ Assert.assertTrue("expected pending cached list to still be empty.",
+ !dn.getPendingCached().iterator().hasNext());
+ Assert.assertEquals("failed to insert blocks[0]", blocks[0],
+ dn.getCached().iterator().next());
+ Assert.assertTrue("expected pending uncached list to still be empty.",
+ !dn.getPendingUncached().iterator().hasNext());
+ // add another block to the back
+ Assert.assertTrue(dn.getCached().add(blocks[1]));
+ Iterator<CachedBlock> iter = dn.getCached().iterator();
+ Assert.assertEquals(blocks[0], iter.next());
+ Assert.assertEquals(blocks[1], iter.next());
+ Assert.assertTrue(!iter.hasNext());
+ // add a block to the front
+ Assert.assertTrue(dn.getCached().addFirst(blocks[2]));
+ iter = dn.getCached().iterator();
+ Assert.assertEquals(blocks[2], iter.next());
+ Assert.assertEquals(blocks[0], iter.next());
+ Assert.assertEquals(blocks[1], iter.next());
+ Assert.assertTrue(!iter.hasNext());
+ // remove a block from the middle
+ Assert.assertTrue(dn.getCached().remove(blocks[0]));
+ iter = dn.getCached().iterator();
+ Assert.assertEquals(blocks[2], iter.next());
+ Assert.assertEquals(blocks[1], iter.next());
+ Assert.assertTrue(!iter.hasNext());
+ // remove all blocks
+ dn.getCached().clear();
+ Assert.assertTrue("expected cached list to be empty after clear.",
+ !dn.getPendingCached().iterator().hasNext());
+ }
+
+ private void testAddElementsToList(CachedBlocksList list,
+ CachedBlock[] blocks) {
+ Assert.assertTrue("expected list to start off empty.",
+ !list.iterator().hasNext());
+ for (CachedBlock block : blocks) {
+ Assert.assertTrue(list.add(block));
+ }
+ }
+
+ private void testRemoveElementsFromList(Random r,
+ CachedBlocksList list, CachedBlock[] blocks) {
+ int i = 0;
+ for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext(); ) {
+ Assert.assertEquals(blocks[i], iter.next());
+ i++;
+ }
+ if (r.nextBoolean()) {
+ LOG.info("Removing via iterator");
+ for (Iterator<CachedBlock> iter = list.iterator(); iter.hasNext() ;) {
+ iter.next();
+ iter.remove();
+ }
+ } else {
+ LOG.info("Removing in pseudo-random order");
+ CachedBlock[] remainingBlocks = Arrays.copyOf(blocks, blocks.length);
+ for (int removed = 0; removed < remainingBlocks.length; ) {
+ int toRemove = r.nextInt(remainingBlocks.length);
+ if (remainingBlocks[toRemove] != null) {
+ Assert.assertTrue(list.remove(remainingBlocks[toRemove]));
+ remainingBlocks[toRemove] = null;
+ removed++;
+ }
+ }
+ }
+ Assert.assertTrue("expected list to be empty after everything " +
+ "was removed.", !list.iterator().hasNext());
+ }
+
+ @Test(timeout=60000)
+ public void testMultipleLists() {
+ DatanodeDescriptor[] datanodes = new DatanodeDescriptor[] {
+ new DatanodeDescriptor(
+ new DatanodeID("127.0.0.1", "localhost", "abcd", 5000, 5001, 5002)),
+ new DatanodeDescriptor(
+ new DatanodeID("127.0.1.1", "localhost", "efgh", 6000, 6001, 6002)),
+ };
+ CachedBlocksList[] lists = new CachedBlocksList[] {
+ datanodes[0].getPendingCached(),
+ datanodes[0].getCached(),
+ datanodes[1].getPendingCached(),
+ datanodes[1].getCached(),
+ datanodes[1].getPendingUncached(),
+ };
+ final int NUM_BLOCKS = 8000;
+ CachedBlock[] blocks = new CachedBlock[NUM_BLOCKS];
+ for (int i = 0; i < NUM_BLOCKS; i++) {
+ blocks[i] = new CachedBlock(i, (short)i, true);
+ }
+ Random r = new Random(654);
+ for (CachedBlocksList list : lists) {
+ testAddElementsToList(list, blocks);
+ }
+ for (CachedBlocksList list : lists) {
+ testRemoveElementsFromList(r, list, blocks);
+ }
+ }
+}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java?rev=1532924&r1=1532923&r2=1532924&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPathBasedCacheRequests.java Wed Oct 16 22:15:33 2013
@@ -17,6 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static junit.framework.Assert.assertTrue;
import static junit.framework.Assert.fail;
import static org.junit.Assert.assertEquals;
@@ -24,6 +30,10 @@ import static org.junit.Assert.assertFal
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
import junit.framework.Assert;
@@ -31,6 +41,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
@@ -46,13 +57,19 @@ import org.apache.hadoop.hdfs.protocol.P
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.GSet;
import org.junit.After;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
+import com.google.common.base.Supplier;
+
public class TestPathBasedCacheRequests {
static final Log LOG = LogFactory.getLog(TestPathBasedCacheRequests.class);
@@ -83,7 +100,7 @@ public class TestPathBasedCacheRequests
}
}
- @Test
+ @Test(timeout=60000)
public void testBasicPoolOperations() throws Exception {
final String poolName = "pool1";
CachePoolInfo info = new CachePoolInfo(poolName).
@@ -218,7 +235,7 @@ public class TestPathBasedCacheRequests
dfs.addCachePool(info);
}
- @Test
+ @Test(timeout=60000)
public void testCreateAndModifyPools() throws Exception {
String poolName = "pool1";
String ownerName = "abc";
@@ -301,7 +318,7 @@ public class TestPathBasedCacheRequests
});
}
- @Test
+ @Test(timeout=60000)
public void testAddRemoveDirectives() throws Exception {
proto.addCachePool(new CachePoolInfo("pool1").
setMode(new FsPermission((short)0777)));
@@ -366,6 +383,7 @@ public class TestPathBasedCacheRequests
try {
addAsUnprivileged(new PathBasedCacheDirective.Builder().
setPath(new Path("/emptypoolname")).
+ setReplication((short)1).
setPool("").
build());
Assert.fail("expected an error when adding a PathBasedCache " +
@@ -424,4 +442,302 @@ public class TestPathBasedCacheRequests
iter = dfs.listPathBasedCacheDescriptors(null, null);
assertFalse(iter.hasNext());
}
+
+ @Test(timeout=60000)
+ public void testCacheManagerRestart() throws Exception {
+ HdfsConfiguration conf = createCachingConf();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+
+ // Create and validate a pool
+ final String pool = "poolparty";
+ String groupName = "partygroup";
+ FsPermission mode = new FsPermission((short)0777);
+ int weight = 747;
+ dfs.addCachePool(new CachePoolInfo(pool)
+ .setGroupName(groupName)
+ .setMode(mode)
+ .setWeight(weight));
+ RemoteIterator<CachePoolInfo> pit = dfs.listCachePools();
+ assertTrue("No cache pools found", pit.hasNext());
+ CachePoolInfo info = pit.next();
+ assertEquals(pool, info.getPoolName());
+ assertEquals(groupName, info.getGroupName());
+ assertEquals(mode, info.getMode());
+ assertEquals(weight, (int)info.getWeight());
+ assertFalse("Unexpected # of cache pools found", pit.hasNext());
+
+ // Create some cache entries
+ int numEntries = 10;
+ String entryPrefix = "/party-";
+ for (int i=0; i<numEntries; i++) {
+ dfs.addPathBasedCacheDirective(
+ new PathBasedCacheDirective.Builder().
+ setPath(new Path(entryPrefix + i)).setPool(pool).build());
+ }
+ RemoteIterator<PathBasedCacheDescriptor> dit
+ = dfs.listPathBasedCacheDescriptors(null, null);
+ for (int i=0; i<numEntries; i++) {
+ assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+ PathBasedCacheDescriptor cd = dit.next();
+ assertEquals(i+1, cd.getEntryId());
+ assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+ assertEquals(pool, cd.getPool());
+ }
+ assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
+
+ // Restart namenode
+ cluster.restartNameNode();
+
+ // Check that state came back up
+ pit = dfs.listCachePools();
+ assertTrue("No cache pools found", pit.hasNext());
+ info = pit.next();
+ assertEquals(pool, info.getPoolName());
+ assertEquals(pool, info.getPoolName());
+ assertEquals(groupName, info.getGroupName());
+ assertEquals(mode, info.getMode());
+ assertEquals(weight, (int)info.getWeight());
+ assertFalse("Unexpected # of cache pools found", pit.hasNext());
+
+ dit = dfs.listPathBasedCacheDescriptors(null, null);
+ for (int i=0; i<numEntries; i++) {
+ assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+ PathBasedCacheDescriptor cd = dit.next();
+ assertEquals(i+1, cd.getEntryId());
+ assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+ assertEquals(pool, cd.getPool());
+ }
+ assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ private static void waitForCachedBlocks(NameNode nn,
+ final int expectedCachedBlocks, final int expectedCachedReplicas)
+ throws Exception {
+ final FSNamesystem namesystem = nn.getNamesystem();
+ final CacheManager cacheManager = namesystem.getCacheManager();
+ LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
+ expectedCachedReplicas + " replicas.");
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ int numCachedBlocks = 0, numCachedReplicas = 0;
+ namesystem.readLock();
+ try {
+ GSet<CachedBlock, CachedBlock> cachedBlocks =
+ cacheManager.getCachedBlocks();
+ if (cachedBlocks != null) {
+ for (Iterator<CachedBlock> iter = cachedBlocks.iterator();
+ iter.hasNext(); ) {
+ CachedBlock cachedBlock = iter.next();
+ numCachedBlocks++;
+ numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size();
+ }
+ }
+ } finally {
+ namesystem.readUnlock();
+ }
+ if ((numCachedBlocks == expectedCachedBlocks) &&
+ (numCachedReplicas == expectedCachedReplicas)) {
+ return true;
+ } else {
+ LOG.info("cached blocks: have " + numCachedBlocks +
+ " / " + expectedCachedBlocks);
+ LOG.info("cached replicas: have " + numCachedReplicas +
+ " / " + expectedCachedReplicas);
+ return false;
+ }
+ }
+ }, 500, 60000);
+ }
+
+ private static final long BLOCK_SIZE = 512;
+ private static final int NUM_DATANODES = 4;
+
+ // Most Linux installs will allow non-root users to lock 64KB.
+ private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+ /**
+ * Return true if we can test DN caching.
+ */
+ private static boolean canTestDatanodeCaching() {
+ if (!NativeIO.isAvailable()) {
+ // Need NativeIO in order to cache blocks on the DN.
+ return false;
+ }
+ if (NativeIO.getMemlockLimit() < CACHE_CAPACITY) {
+ return false;
+ }
+ return true;
+ }
+
+ private static HdfsConfiguration createCachingConf() {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+ conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+ conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
+ return conf;
+ }
+
+ @Test(timeout=120000)
+ public void testWaitForCachedReplicas() throws Exception {
+ Assume.assumeTrue(canTestDatanodeCaching());
+ HdfsConfiguration conf = createCachingConf();
+ FileSystemTestHelper helper = new FileSystemTestHelper();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ NamenodeProtocols nnRpc = namenode.getRpcServer();
+ Path rootDir = helper.getDefaultWorkingDirectory(dfs);
+ // Create the pool
+ final String pool = "friendlyPool";
+ nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+ // Create some test files
+ final int numFiles = 2;
+ final int numBlocksPerFile = 2;
+ final List<String> paths = new ArrayList<String>(numFiles);
+ for (int i=0; i<numFiles; i++) {
+ Path p = new Path(rootDir, "testCachePaths-" + i);
+ FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+ (int)BLOCK_SIZE);
+ paths.add(p.toUri().getPath());
+ }
+ // Check the initial statistics at the namenode
+ waitForCachedBlocks(namenode, 0, 0);
+ // Cache and check each path in sequence
+ int expected = 0;
+ for (int i=0; i<numFiles; i++) {
+ PathBasedCacheDirective directive =
+ new PathBasedCacheDirective.Builder().
+ setPath(new Path(paths.get(i))).
+ setPool(pool).
+ build();
+ PathBasedCacheDescriptor descriptor =
+ nnRpc.addPathBasedCacheDirective(directive);
+ assertEquals("Descriptor does not match requested path",
+ new Path(paths.get(i)), descriptor.getPath());
+ assertEquals("Descriptor does not match requested pool", pool,
+ descriptor.getPool());
+ expected += numBlocksPerFile;
+ waitForCachedBlocks(namenode, expected, expected);
+ }
+ // Uncache and check each path in sequence
+ RemoteIterator<PathBasedCacheDescriptor> entries =
+ nnRpc.listPathBasedCacheDescriptors(0, null, null);
+ for (int i=0; i<numFiles; i++) {
+ PathBasedCacheDescriptor descriptor = entries.next();
+ nnRpc.removePathBasedCacheDescriptor(descriptor.getEntryId());
+ expected -= numBlocksPerFile;
+ waitForCachedBlocks(namenode, expected, expected);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testAddingPathBasedCacheDirectivesWhenCachingIsDisabled()
+ throws Exception {
+ HdfsConfiguration conf = createCachingConf();
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ // Create the pool
+ String pool = "pool1";
+ namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final int numFiles = 2;
+ final int numBlocksPerFile = 2;
+ final List<String> paths = new ArrayList<String>(numFiles);
+ for (int i=0; i<numFiles; i++) {
+ Path p = new Path("/testCachePaths-" + i);
+ FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+ (int)BLOCK_SIZE);
+ paths.add(p.toUri().getPath());
+ }
+ // Check the initial statistics at the namenode
+ waitForCachedBlocks(namenode, 0, 0);
+ // Cache and check each path in sequence
+ int expected = 0;
+ for (int i=0; i<numFiles; i++) {
+ PathBasedCacheDirective directive =
+ new PathBasedCacheDirective.Builder().
+ setPath(new Path(paths.get(i))).
+ setPool(pool).
+ build();
+ dfs.addPathBasedCacheDirective(directive);
+ waitForCachedBlocks(namenode, expected, 0);
+ }
+ Thread.sleep(20000);
+ waitForCachedBlocks(namenode, expected, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testWaitForCachedReplicasInDirectory() throws Exception {
+ Assume.assumeTrue(canTestDatanodeCaching());
+ HdfsConfiguration conf = createCachingConf();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ // Create the pool
+ final String pool = "friendlyPool";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final List<Path> paths = new LinkedList<Path>();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
+ }
+ waitForCachedBlocks(namenode, 0, 0);
+ // cache entire directory
+ PathBasedCacheDescriptor descriptor = dfs.addPathBasedCacheDirective(
+ new PathBasedCacheDirective.Builder().
+ setPath(new Path("/foo")).
+ setReplication((short)2).
+ setPool(pool).
+ build());
+ assertEquals("Descriptor does not match requested pool", pool,
+ descriptor.getPool());
+ waitForCachedBlocks(namenode, 4, 8);
+ // remove and watch numCached go to 0
+ dfs.removePathBasedCacheDescriptor(descriptor);
+ waitForCachedBlocks(namenode, 0, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
}