You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/09/14 01:27:23 UTC
svn commit: r1523145 [2/2] - in
/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocolPB/
src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ sr...
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,271 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs.BlockReportIterator;
+import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * Handles common operations of processing a block report from a datanode,
+ * generating a diff of updates to the BlocksMap, and then feeding the diff
+ * to the subclass-implemented hooks.
+ */
+@InterfaceAudience.LimitedPrivate({"HDFS"})
+public abstract class ReportProcessor {
+
+ static final Log blockLog = NameNode.blockStateChangeLog;
+ private final String className = getClass().getSimpleName();
+ // Max number of blocks to log info about during a block report.
+ final long maxNumBlocksToLog;
+
+ void blockLogDebug(String message) {
+ if (blockLog.isDebugEnabled()) {
+ blockLog.info("BLOCK* " + className + message);
+ }
+ }
+
+ void blockLogInfo(String message) {
+ if (blockLog.isInfoEnabled()) {
+ blockLog.info("BLOCK* " + className + message);
+ }
+ }
+
+ void blockLogWarn(String message) {
+ blockLog.warn("BLOCK* " + className + message);
+ }
+
+ void logAddStoredBlock(BlockInfo storedBlock, DatanodeDescriptor node) {
+ if (!blockLog.isInfoEnabled()) {
+ return;
+ }
+ StringBuilder sb = new StringBuilder(500);
+ sb.append("BLOCK* " + className + "#addStoredBlock: blockMap updated: ")
+ .append(node)
+ .append(" is added to ");
+ storedBlock.appendStringTo(sb);
+ sb.append(" size " )
+ .append(storedBlock.getNumBytes());
+ blockLog.info(sb);
+ }
+
+ public ReportProcessor(Configuration conf) {
+ this.maxNumBlocksToLog = conf.getLong(
+ DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
+ DFSConfigKeys.DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
+ }
+
+ /**
+ * Processes a block report from a datanode, updating the block to
+ * datanode mapping, adding new blocks and removing invalid ones.
+ * Also computes and queues new replication and invalidation work.
+ * @param node Datanode sending the block report
+ * @param report as list of longs
+ * @throws IOException
+ */
+ final void processReport(final DatanodeDescriptor node,
+ final BlockListAsLongs report) throws IOException {
+ // Normal case:
+ // Modify the (block-->datanode) map, according to the difference
+ // between the old and new block report.
+ //
+ Collection<BlockInfo> toAdd = new LinkedList<BlockInfo>();
+ Collection<Block> toRemove = new LinkedList<Block>();
+ Collection<Block> toInvalidate = new LinkedList<Block>();
+ Collection<BlockToMarkCorrupt> toCorrupt = new LinkedList<BlockToMarkCorrupt>();
+ Collection<StatefulBlockInfo> toUC = new LinkedList<StatefulBlockInfo>();
+ reportDiff(node, report, toAdd, toRemove, toInvalidate, toCorrupt, toUC);
+
+ // Process the blocks on each queue
+ for (StatefulBlockInfo b : toUC) {
+ addStoredBlockUnderConstruction(b.storedBlock, node, b.reportedState);
+ }
+ for (Block b : toRemove) {
+ removeStoredBlock(b, node);
+ }
+ int numBlocksLogged = 0;
+ for (BlockInfo b : toAdd) {
+ addStoredBlock(b, node, null, numBlocksLogged < maxNumBlocksToLog);
+ numBlocksLogged++;
+ }
+
+ if (numBlocksLogged > maxNumBlocksToLog) {
+ blockLogInfo("#processReport: logged"
+ + " info for " + maxNumBlocksToLog
+ + " of " + numBlocksLogged + " reported.");
+ }
+ for (Block b : toInvalidate) {
+ blockLogInfo("#processReport: "
+ + b + " on " + node + " size " + b.getNumBytes()
+ + " does not belong to any file");
+ addToInvalidates(b, node);
+ }
+ for (BlockToMarkCorrupt b : toCorrupt) {
+ markBlockAsCorrupt(b, node);
+ }
+ }
+
+ /**
+ * Compute the difference between the current state of the datanode in the
+ * BlocksMap and the new reported state, categorizing changes into
+ * different groups (e.g. new blocks to be added, blocks that were removed,
+ * blocks that should be invalidated, etc.).
+ */
+ private void reportDiff(DatanodeDescriptor dn,
+ BlockListAsLongs newReport,
+ Collection<BlockInfo> toAdd, // add to DatanodeDescriptor
+ Collection<Block> toRemove, // remove from DatanodeDescriptor
+ Collection<Block> toInvalidate, // should be removed from DN
+ Collection<BlockToMarkCorrupt> toCorrupt, // add to corrupt replicas list
+ Collection<StatefulBlockInfo> toUC) { // add to under-construction list
+ // place a delimiter in the list which separates blocks
+ // that have been reported from those that have not
+ BlockInfo delimiter = new BlockInfo(new Block(), 1);
+ boolean added = addBlock(dn, delimiter);
+ assert added : "Delimiting block cannot be present in the node";
+ int headIndex = 0; //currently the delimiter is in the head of the list
+ int curIndex;
+
+ if (newReport == null) {
+ newReport = new BlockListAsLongs();
+ }
+ // scan the report and process newly reported blocks
+ BlockReportIterator itBR = newReport.getBlockReportIterator();
+ while (itBR.hasNext()) {
+ Block iblk = itBR.next();
+ ReplicaState iState = itBR.getCurrentReplicaState();
+ BlockInfo storedBlock = processReportedBlock(dn, iblk, iState,
+ toAdd, toInvalidate, toCorrupt, toUC);
+ // move block to the head of the list
+ if (storedBlock != null && (curIndex = storedBlock.findDatanode(dn)) >= 0) {
+ headIndex = moveBlockToHead(dn, storedBlock, curIndex, headIndex);
+ }
+ }
+ // collect blocks that have not been reported
+ // all of them are next to the delimiter
+ Iterator<? extends Block> it = new DatanodeDescriptor.BlockIterator(
+ delimiter.getNext(0), dn);
+ while (it.hasNext()) {
+ toRemove.add(it.next());
+ }
+ removeBlock(dn, delimiter);
+ }
+
+ // Operations on the blocks on a datanode
+
+ abstract int moveBlockToHead(DatanodeDescriptor dn, BlockInfo storedBlock,
+ int curIndex, int headIndex);
+
+ abstract boolean addBlock(DatanodeDescriptor dn, BlockInfo block);
+
+ abstract boolean removeBlock(DatanodeDescriptor dn, BlockInfo block);
+
+ // Cache report processing
+
+ abstract BlockInfo processReportedBlock(DatanodeDescriptor dn, Block iblk,
+ ReplicaState iState, Collection<BlockInfo> toAdd,
+ Collection<Block> toInvalidate, Collection<BlockToMarkCorrupt> toCorrupt,
+ Collection<StatefulBlockInfo> toUC);
+
+ // Hooks for processing the cache report diff
+
+ abstract Block addStoredBlock(final BlockInfo block,
+ DatanodeDescriptor node, DatanodeDescriptor delNodeHint,
+ boolean logEveryBlock) throws IOException;
+
+ abstract void removeStoredBlock(Block block, DatanodeDescriptor node);
+
+ abstract void markBlockAsCorrupt(BlockToMarkCorrupt b, DatanodeInfo dn)
+ throws IOException;
+
+ abstract void addToInvalidates(final Block b, final DatanodeInfo node);
+
+ abstract void addStoredBlockUnderConstruction(
+ BlockInfoUnderConstruction storedBlock, DatanodeDescriptor node,
+ ReplicaState reportedState) throws IOException;
+
+ /**
+ * BlockToMarkCorrupt is used to build the "toCorrupt" list, which is a
+ * list of blocks that should be considered corrupt due to a block report.
+ */
+ static class BlockToMarkCorrupt {
+ /** The corrupted block in a datanode. */
+ final BlockInfo corrupted;
+ /** The corresponding block stored in the BlockManager. */
+ final BlockInfo stored;
+ /** The reason to mark corrupt. */
+ final String reason;
+
+ BlockToMarkCorrupt(BlockInfo corrupted, BlockInfo stored, String reason) {
+ Preconditions.checkNotNull(corrupted, "corrupted is null");
+ Preconditions.checkNotNull(stored, "stored is null");
+
+ this.corrupted = corrupted;
+ this.stored = stored;
+ this.reason = reason;
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, String reason) {
+ this(stored, stored, reason);
+ }
+
+ BlockToMarkCorrupt(BlockInfo stored, long gs, String reason) {
+ this(new BlockInfo(stored), stored, reason);
+ //the corrupted block in datanode has a different generation stamp
+ corrupted.setGenerationStamp(gs);
+ }
+
+ @Override
+ public String toString() {
+ return corrupted + "("
+ + (corrupted == stored? "same as stored": "stored=" + stored) + ")";
+ }
+ }
+
+ /**
+ * StatefulBlockInfo is used to build the "toUC" list, which is a list of
+ * updates to the information about under-construction blocks.
+ * Besides the block in question, it provides the ReplicaState
+ * reported by the datanode in the block report.
+ */
+ static class StatefulBlockInfo {
+ final BlockInfoUnderConstruction storedBlock;
+ final ReplicaState reportedState;
+
+ StatefulBlockInfo(BlockInfoUnderConstruction storedBlock,
+ ReplicaState reportedState) {
+ this.storedBlock = storedBlock;
+ this.reportedState = reportedState;
+ }
+ }
+
+}
Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/ReportProcessor.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import java.util.List;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hdfs.protocol.Block;
+
+/**
+ * Subclass of InvalidateBlocks used by the CacheReplicationManager to
+ * track blocks on each storage that are scheduled to be uncached.
+ */
+@InterfaceAudience.Private
+public class UncacheBlocks extends InvalidateBlocks {
+
+ UncacheBlocks() {
+ }
+
+ @Override
+ synchronized List<Block> invalidateWork(
+ final String storageId, final DatanodeDescriptor dn) {
+ final List<Block> toInvalidate = pollNumBlocks(storageId, Integer.MAX_VALUE);
+ if (toInvalidate != null) {
+ dn.addBlocksToBeUncached(toInvalidate);
+ }
+ return toInvalidate;
+ }
+}
Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/UncacheBlocks.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Fri Sep 13 23:27:22 2013
@@ -552,10 +552,12 @@ class BPOfferService {
case DatanodeProtocol.DNA_CACHE:
LOG.info("DatanodeCommand action: DNA_CACHE");
dn.getFSDataset().cache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+ dn.metrics.incrBlocksCached(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_UNCACHE:
LOG.info("DatanodeCommand action: DNA_UNCACHE");
dn.getFSDataset().uncache(bcmd.getBlockPoolId(), bcmd.getBlocks());
+ dn.metrics.incrBlocksUncached(bcmd.getBlocks().length);
break;
case DatanodeProtocol.DNA_SHUTDOWN:
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Fri Sep 13 23:27:22 2013
@@ -449,11 +449,24 @@ class BPServiceActor implements Runnable
DatanodeCommand cmd = null;
long startTime = Time.monotonicNow();
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
- // TODO: Implement me!
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending cacheReport from service actor: " + this);
+ }
+ lastCacheReport = startTime;
+
String bpid = bpos.getBlockPoolId();
BlockListAsLongs blocks = dn.getFSDataset().getCacheReport(bpid);
+ long createTime = Time.monotonicNow();
+
cmd = bpNamenode.cacheReport(bpRegistration, bpid,
blocks.getBlockListAsLongs());
+ long sendTime = Time.monotonicNow();
+ long createCost = createTime - startTime;
+ long sendCost = sendTime - createTime;
+ dn.getMetrics().addCacheReport(sendCost);
+ LOG.info("CacheReport of " + blocks.getNumberOfBlocks()
+ + " blocks took " + createCost + " msec to generate and "
+ + sendCost + " msecs for RPC and NN processing");
}
return cmd;
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Fri Sep 13 23:27:22 2013
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
@@ -114,9 +116,9 @@ public class DNConf {
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
- DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
- this.cacheReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+ this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+ DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
long initBRDelay = conf.getLong(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Fri Sep 13 23:27:22 2013
@@ -105,10 +105,10 @@ public class FsDatasetCache {
*/
List<Block> getCachedBlocks(String bpid) {
List<Block> blocks = new ArrayList<Block>();
- MappableBlock mapBlock = null;
// ConcurrentHashMap iteration doesn't see latest updates, which is okay
- for (Iterator<MappableBlock> it = cachedBlocks.values().iterator();
- it.hasNext(); mapBlock = it.next()) {
+ Iterator<MappableBlock> it = cachedBlocks.values().iterator();
+ while (it.hasNext()) {
+ MappableBlock mapBlock = it.next();
if (mapBlock.getBlockPoolId().equals(bpid)) {
blocks.add(mapBlock.getBlock());
}
@@ -174,12 +174,15 @@ public class FsDatasetCache {
mapBlock.getBlockPoolId().equals(bpid) &&
mapBlock.getBlock().equals(block)) {
mapBlock.close();
- cachedBlocks.remove(mapBlock);
+ cachedBlocks.remove(block.getBlockId());
long bytes = mapBlock.getNumBytes();
long used = usedBytes.get();
while (!usedBytes.compareAndSet(used, used - bytes)) {
used = usedBytes.get();
}
+ LOG.info("Successfully uncached block " + block);
+ } else {
+ LOG.info("Could not uncache block " + block + ": unknown block.");
}
}
@@ -219,6 +222,7 @@ public class FsDatasetCache {
used = usedBytes.get();
}
} else {
+ LOG.info("Successfully cached block " + block.getBlock());
cachedBlocks.put(block.getBlock().getBlockId(), block);
}
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Fri Sep 13 23:27:22 2013
@@ -57,6 +57,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong blocksRemoved;
@Metric MutableCounterLong blocksVerified;
@Metric MutableCounterLong blockVerificationFailures;
+ @Metric MutableCounterLong blocksCached;
+ @Metric MutableCounterLong blocksUncached;
@Metric MutableCounterLong readsFromLocalClient;
@Metric MutableCounterLong readsFromRemoteClient;
@Metric MutableCounterLong writesFromLocalClient;
@@ -74,6 +76,7 @@ public class DataNodeMetrics {
@Metric MutableRate replaceBlockOp;
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
+ @Metric MutableRate cacheReports;
@Metric MutableRate packetAckRoundTripTimeNanos;
MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
@@ -151,6 +154,10 @@ public class DataNodeMetrics {
blockReports.add(latency);
}
+ public void addCacheReport(long latency) {
+ cacheReports.add(latency);
+ }
+
public void incrBlocksReplicated(int delta) {
blocksReplicated.incr(delta);
}
@@ -175,6 +182,15 @@ public class DataNodeMetrics {
blocksVerified.incr();
}
+
+ public void incrBlocksCached(int delta) {
+ blocksCached.incr(delta);
+ }
+
+ public void incrBlocksUncached(int delta) {
+ blocksUncached.incr(delta);
+ }
+
public void addReadBlockOp(long latency) {
readBlockOp.add(latency);
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Sep 13 23:27:22 2013
@@ -26,9 +26,9 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
+import java.util.Map.Entry;
import java.util.SortedMap;
import java.util.TreeMap;
-import java.util.Map.Entry;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -51,7 +51,7 @@ import org.apache.hadoop.util.Fallible;
/**
* The Cache Manager handles caching on DataNodes.
*/
-final class CacheManager {
+public final class CacheManager {
public static final Log LOG = LogFactory.getLog(CacheManager.class);
/**
@@ -70,6 +70,12 @@ final class CacheManager {
new TreeMap<PathBasedCacheDirective, PathBasedCacheEntry>();
/**
+ * Cache entries, sorted by path
+ */
+ private final TreeMap<String, List<PathBasedCacheEntry>> entriesByPath =
+ new TreeMap<String, List<PathBasedCacheEntry>>();
+
+ /**
* Cache pools, sorted by name.
*/
private final TreeMap<String, CachePool> cachePools =
@@ -90,9 +96,14 @@ final class CacheManager {
*/
private final int maxListCacheDirectivesResponses;
- CacheManager(FSDirectory dir, Configuration conf) {
+ final private FSNamesystem namesystem;
+ final private FSDirectory dir;
+
+ CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) {
// TODO: support loading and storing of the CacheManager state
clear();
+ this.namesystem = namesystem;
+ this.dir = dir;
maxListCachePoolsResponses = conf.getInt(
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
@@ -104,6 +115,7 @@ final class CacheManager {
synchronized void clear() {
entriesById.clear();
entriesByDirective.clear();
+ entriesByPath.clear();
cachePools.clear();
nextEntryId = 1;
}
@@ -131,7 +143,8 @@ final class CacheManager {
try {
directive.validate();
} catch (IOException ioe) {
- LOG.info("addDirective " + directive + ": validation failed.");
+ LOG.info("addDirective " + directive + ": validation failed: "
+ + ioe.getClass().getName() + ": " + ioe.getMessage());
return new Fallible<PathBasedCacheEntry>(ioe);
}
// Check if we already have this entry.
@@ -152,8 +165,34 @@ final class CacheManager {
}
LOG.info("addDirective " + directive + ": added cache directive "
+ directive);
+
+ // Success!
+ // First, add it to the various maps
entriesByDirective.put(directive, entry);
entriesById.put(entry.getEntryId(), entry);
+ String path = directive.getPath();
+ List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
+ if (entryList == null) {
+ entryList = new ArrayList<PathBasedCacheEntry>(1);
+ entriesByPath.put(path, entryList);
+ }
+ entryList.add(entry);
+
+ // Next, set the path as cached in the namesystem
+ try {
+ INode node = dir.getINode(directive.getPath());
+ if (node.isFile()) {
+ INodeFile file = node.asFile();
+ // TODO: adjustable cache replication factor
+ namesystem.setCacheReplicationInt(directive.getPath(),
+ file.getBlockReplication());
+ }
+ } catch (IOException ioe) {
+ LOG.info("addDirective " + directive +": failed to cache file: " +
+ ioe.getClass().getName() +": " + ioe.getMessage());
+ return new Fallible<PathBasedCacheEntry>(ioe);
+ }
+
return new Fallible<PathBasedCacheEntry>(entry);
}
@@ -201,7 +240,31 @@ final class CacheManager {
return new Fallible<Long>(
new UnexpectedRemovePathBasedCacheEntryException(entryId));
}
+ // Remove the corresponding entry in entriesByPath.
+ String path = existing.getDirective().getPath();
+ List<PathBasedCacheEntry> entries = entriesByPath.get(path);
+ if (entries == null || !entries.remove(existing)) {
+ return new Fallible<Long>(
+ new UnexpectedRemovePathBasedCacheEntryException(entryId));
+ }
+ if (entries.size() == 0) {
+ entriesByPath.remove(path);
+ }
entriesById.remove(entryId);
+
+ // Set the path as uncached in the namesystem
+ try {
+ INode node = dir.getINode(existing.getDirective().getPath());
+ if (node.isFile()) {
+ namesystem.setCacheReplicationInt(existing.getDirective().getPath(),
+ (short) 0);
+ }
+ } catch (IOException e) {
+ LOG.warn("removeEntry " + entryId + ": failure while setting cache"
+ + " replication factor", e);
+ return new Fallible<Long>(e);
+ }
+ LOG.info("removeEntry successful for PathCacheEntry id " + entryId);
return new Fallible<Long>(entryId);
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Fri Sep 13 23:27:22 2013
@@ -1092,6 +1092,52 @@ public class FSDirectory implements Clos
}
/**
+ * Set cache replication for a file
+ *
+ * @param src file name
+ * @param replication new replication
+ * @param blockRepls block replications - output parameter
+ * @return array of file blocks
+ * @throws QuotaExceededException
+ * @throws SnapshotAccessControlException
+ */
+ Block[] setCacheReplication(String src, short replication, short[] blockRepls)
+ throws QuotaExceededException, UnresolvedLinkException,
+ SnapshotAccessControlException {
+ waitForReady();
+ writeLock();
+ try {
+ return unprotectedSetCacheReplication(src, replication, blockRepls);
+ } finally {
+ writeUnlock();
+ }
+ }
+
+ Block[] unprotectedSetCacheReplication(String src, short replication,
+ short[] blockRepls) throws QuotaExceededException,
+ UnresolvedLinkException, SnapshotAccessControlException {
+ assert hasWriteLock();
+
+ final INodesInPath iip = rootDir.getINodesInPath4Write(src, true);
+ final INode inode = iip.getLastINode();
+ if (inode == null || !inode.isFile()) {
+ return null;
+ }
+ INodeFile file = inode.asFile();
+ final short oldBR = file.getCacheReplication();
+
+ // TODO: Update quotas here as repl goes up or down
+ file.setCacheReplication(replication);
+ final short newBR = file.getCacheReplication();
+
+ if (blockRepls != null) {
+ blockRepls[0] = oldBR;
+ blockRepls[1] = newBR;
+ }
+ return file.getBlocks();
+ }
+
+ /**
* @param path the file path
* @return the block size of the file.
*/
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep 13 23:27:22 2013
@@ -367,6 +367,7 @@ public class FSNamesystem implements Nam
private final BlockManager blockManager;
private final SnapshotManager snapshotManager;
private final CacheManager cacheManager;
+ private final CacheReplicationManager cacheReplicationManager;
private final DatanodeStatistics datanodeStatistics;
// Block pool ID used by this namenode
@@ -694,7 +695,9 @@ public class FSNamesystem implements Nam
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.snapshotManager = new SnapshotManager(dir);
- this.cacheManager= new CacheManager(dir, conf);
+ this.cacheManager = new CacheManager(this, dir, conf);
+ this.cacheReplicationManager = new CacheReplicationManager(this,
+ blockManager, blockManager.getDatanodeManager(), this, conf);
this.safeMode = new SafeModeInfo(conf);
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -871,6 +874,7 @@ public class FSNamesystem implements Nam
getCompleteBlocksTotal());
setBlockTotal();
blockManager.activate(conf);
+ cacheReplicationManager.activate();
} finally {
writeUnlock();
}
@@ -887,6 +891,7 @@ public class FSNamesystem implements Nam
writeLock();
try {
if (blockManager != null) blockManager.close();
+ if (cacheReplicationManager != null) cacheReplicationManager.close();
} finally {
writeUnlock();
}
@@ -917,7 +922,9 @@ public class FSNamesystem implements Nam
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
-
+
+ cacheReplicationManager.clearQueues();
+
if (!isInSafeMode() ||
(isInSafeMode() && safeMode.isPopulatingReplQueues())) {
LOG.info("Reprocessing replication and invalidation queues");
@@ -1910,6 +1917,42 @@ public class FSNamesystem implements Nam
return isFile;
}
+ boolean setCacheReplicationInt(String src, final short replication)
+ throws IOException {
+ final boolean isFile;
+ FSPermissionChecker pc = getPermissionChecker();
+ checkOperation(OperationCategory.WRITE);
+ byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
+ writeLock();
+ try {
+ checkOperation(OperationCategory.WRITE);
+ if (isInSafeMode()) {
+ throw new SafeModeException("Cannot set replication for " + src, safeMode);
+ }
+ src = FSDirectory.resolvePath(src, pathComponents, dir);
+ if (isPermissionEnabled) {
+ checkPathAccess(pc, src, FsAction.WRITE);
+ }
+
+ final short[] blockRepls = new short[2]; // 0: old, 1: new
+ final Block[] blocks = dir.setCacheReplication(src, replication,
+ blockRepls);
+ isFile = (blocks != null);
+ if (isFile) {
+ cacheReplicationManager.setCacheReplication(blockRepls[0],
+ blockRepls[1], src, blocks);
+ }
+ } finally {
+ writeUnlock();
+ }
+
+ getEditLog().logSync();
+ if (isFile) {
+ logAuditEvent(true, "setReplication", src);
+ }
+ return isFile;
+ }
+
long getPreferredBlockSize(String filename)
throws IOException, UnresolvedLinkException {
FSPermissionChecker pc = getPermissionChecker();
@@ -6391,6 +6434,14 @@ public class FSNamesystem implements Nam
public FSDirectory getFSDirectory() {
return dir;
}
+ /** @return the cache manager. */
+ public CacheManager getCacheManager() {
+ return cacheManager;
+ }
+ /** @return the cache replication manager. */
+ public CacheReplicationManager getCacheReplicationManager() {
+ return cacheReplicationManager;
+ }
@Override // NameNodeMXBean
public String getCorruptFiles() {
@@ -6959,10 +7010,6 @@ public class FSNamesystem implements Nam
return results;
}
- public CacheManager getCacheManager() {
- return cacheManager;
- }
-
/**
* Default AuditLogger implementation; used when no access logger is
* defined in the config file. It can also be explicitly listed in the
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/INodeFile.java Fri Sep 13 23:27:22 2013
@@ -104,6 +104,8 @@ public class INodeFile extends INodeWith
private BlockInfo[] blocks;
+ private short cacheReplication = 0;
+
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime, long atime,
BlockInfo[] blklist, short replication, long preferredBlockSize) {
super(id, name, permissions, mtime, atime);
@@ -199,6 +201,18 @@ public class INodeFile extends INodeWith
return nodeToUpdate;
}
+ @Override
+ public void setCacheReplication(short cacheReplication) {
+ Preconditions.checkArgument(cacheReplication <= getBlockReplication(),
+ "Cannot set cache replication higher than block replication factor");
+ this.cacheReplication = cacheReplication;
+ }
+
+ @Override
+ public short getCacheReplication() {
+ return cacheReplication;
+ }
+
/** @return preferred block size (in bytes) of the file. */
@Override
public long getPreferredBlockSize() {
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Fri Sep 13 23:27:22 2013
@@ -968,7 +968,14 @@ class NameNodeRpcServer implements Namen
String poolId, long[] blocks) throws IOException {
verifyRequest(nodeReg);
BlockListAsLongs blist = new BlockListAsLongs(blocks);
- namesystem.getBlockManager().processCacheReport(nodeReg, poolId, blist);
+ if (blockStateChangeLog.isDebugEnabled()) {
+ blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
+ + "from " + nodeReg + " " + blist.getNumberOfBlocks()
+ + " blocks");
+ }
+
+ namesystem.getCacheReplicationManager()
+ .processCacheReport(nodeReg, poolId, blist);
return null;
}
Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1523145&r1=1523144&r2=1523145&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Fri Sep 13 23:27:22 2013
@@ -79,6 +79,8 @@ public class NameNodeMetrics {
MutableCounterLong transactionsBatchedInSync;
@Metric("Block report") MutableRate blockReport;
MutableQuantiles[] blockReportQuantiles;
+ @Metric("Cache report") MutableRate cacheReport;
+ MutableQuantiles[] cacheReportQuantiles;
@Metric("Duration in SafeMode at startup") MutableGaugeInt safeModeTime;
@Metric("Time loading FS Image at startup") MutableGaugeInt fsImageLoadTime;
@@ -89,6 +91,7 @@ public class NameNodeMetrics {
final int len = intervals.length;
syncsQuantiles = new MutableQuantiles[len];
blockReportQuantiles = new MutableQuantiles[len];
+ cacheReportQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) {
int interval = intervals[i];
@@ -98,6 +101,9 @@ public class NameNodeMetrics {
blockReportQuantiles[i] = registry.newQuantiles(
"blockReport" + interval + "s",
"Block report", "ops", "latency", interval);
+ cacheReportQuantiles[i] = registry.newQuantiles(
+ "cacheReport" + interval + "s",
+ "Cache report", "ops", "latency", interval);
}
}
@@ -227,6 +233,13 @@ public class NameNodeMetrics {
}
}
+ public void addCacheBlockReport(long latency) {
+ cacheReport.add(latency);
+ for (MutableQuantiles q : cacheReportQuantiles) {
+ q.add(latency);
+ }
+ }
+
public void setSafeModeTime(long elapsed) {
safeModeTime.set((int) elapsed);
}
Added: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java?rev=1523145&view=auto
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java (added)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java Fri Sep 13 23:27:22 2013
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.blockmanagement;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assume.assumeTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.util.Fallible;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestCacheReplicationManager {
+
+ // Most Linux installs allow a default of 64KB locked memory
+ private static final long CACHE_CAPACITY = 64 * 1024;
+ private static final long BLOCK_SIZE = 4096;
+
+ private static Configuration conf;
+ private static MiniDFSCluster cluster = null;
+ private static FileSystem fs;
+ private static NameNode nn;
+ private static NamenodeProtocols nnRpc;
+ private static CacheReplicationManager cacheReplManager;
+ final private static FileSystemTestHelper helper = new FileSystemTestHelper();
+ private static Path rootDir;
+
+ @Before
+ public void setUp() throws Exception {
+
+ assumeTrue(NativeIO.isAvailable());
+
+ conf = new HdfsConfiguration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ CACHE_CAPACITY);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+ conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+
+ cluster = new MiniDFSCluster.Builder(conf)
+ .numDataNodes(1).build();
+ cluster.waitActive();
+
+ fs = cluster.getFileSystem();
+ nn = cluster.getNameNode();
+ nnRpc = nn.getRpcServer();
+ cacheReplManager = nn.getNamesystem().getCacheReplicationManager();
+ rootDir = helper.getDefaultWorkingDirectory(fs);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (fs != null) {
+ fs.close();
+ }
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+ private int countNumCachedBlocks() {
+ return cacheReplManager.cachedBlocksMap.size();
+ }
+
+ private void waitForExpectedNumCachedBlocks(final int expected)
+ throws Exception {
+ int actual = countNumCachedBlocks();
+ while (expected != actual) {
+ Thread.sleep(500);
+ actual = countNumCachedBlocks();
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testCachePaths() throws Exception {
+ // Create the pool
+ final String pool = "friendlyPool";
+ nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+ // Create some test files
+ final int numFiles = 3;
+ final int numBlocksPerFile = 2;
+ final List<String> paths = new ArrayList<String>(numFiles);
+ for (int i=0; i<numFiles; i++) {
+ Path p = new Path(rootDir, "testCachePaths-" + i);
+ FileSystemTestHelper.createFile(fs, p, numBlocksPerFile, (int)BLOCK_SIZE);
+ paths.add(p.toUri().getPath());
+ }
+ // Check the initial statistics at the namenode
+ int expected = 0;
+ waitForExpectedNumCachedBlocks(expected);
+ // Cache and check each path in sequence
+ for (int i=0; i<numFiles; i++) {
+ List<PathBasedCacheDirective> toAdd =
+ new ArrayList<PathBasedCacheDirective>();
+ toAdd.add(new PathBasedCacheDirective(paths.get(i), pool));
+ List<Fallible<PathBasedCacheEntry>> fallibles =
+ nnRpc.addPathBasedCacheDirectives(toAdd);
+ assertEquals("Unexpected number of fallibles",
+ 1, fallibles.size());
+ PathBasedCacheEntry entry = fallibles.get(0).get();
+ PathBasedCacheDirective directive = entry.getDirective();
+ assertEquals("Directive does not match requested path", paths.get(i),
+ directive.getPath());
+ assertEquals("Directive does not match requested pool", pool,
+ directive.getPool());
+ expected += numBlocksPerFile;
+ waitForExpectedNumCachedBlocks(expected);
+ }
+ // Uncache and check each path in sequence
+ RemoteIterator<PathBasedCacheEntry> entries =
+ nnRpc.listPathBasedCacheEntries(0, null, null);
+ for (int i=0; i<numFiles; i++) {
+ PathBasedCacheEntry entry = entries.next();
+ List<Long> toRemove = new ArrayList<Long>();
+ toRemove.add(entry.getEntryId());
+ List<Fallible<Long>> fallibles = nnRpc.removePathBasedCacheEntries(toRemove);
+ assertEquals("Unexpected number of fallibles", 1, fallibles.size());
+ Long l = fallibles.get(0).get();
+ assertEquals("Removed entryId does not match requested",
+ entry.getEntryId(), l.longValue());
+ expected -= numBlocksPerFile;
+ waitForExpectedNumCachedBlocks(expected);
+ }
+ }
+}
Propchange: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
------------------------------------------------------------------------------
svn:eol-style = native