You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/10/29 01:49:23 UTC
svn commit: r1536572 [2/4] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/
src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/client/
src/main/java/org/apache/hadoop/hdfs/p...
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java Tue Oct 29 00:49:20 2013
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hdfs.server.pro
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
+import com.google.common.primitives.Longs;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
@@ -109,8 +112,9 @@ public class DatanodeProtocolServerSideT
p.getBlockPoolUsed());
}
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
- report, request.getXmitsInProgress(), request.getXceiverCount(),
- request.getFailedVolumes());
+ report, request.getDnCacheCapacity(), request.getDnCacheUsed(),
+ request.getXmitsInProgress(),
+ request.getXceiverCount(), request.getFailedVolumes());
} catch (IOException e) {
throw new ServiceException(e);
}
@@ -160,6 +164,27 @@ public class DatanodeProtocolServerSideT
}
@Override
+ public CacheReportResponseProto cacheReport(RpcController controller,
+ CacheReportRequestProto request) throws ServiceException {
+ DatanodeCommand cmd = null;
+ try {
+ cmd = impl.cacheReport(
+ PBHelper.convert(request.getRegistration()),
+ request.getBlockPoolId(),
+ request.getBlocksList());
+ } catch (IOException e) {
+ throw new ServiceException(e);
+ }
+ CacheReportResponseProto.Builder builder =
+ CacheReportResponseProto.newBuilder();
+ if (cmd != null) {
+ builder.setCmd(PBHelper.convert(cmd));
+ }
+ return builder.build();
+ }
+
+
+ @Override
public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
RpcController controller, BlockReceivedAndDeletedRequestProto request)
throws ServiceException {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Oct 29 00:49:20 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@@ -118,6 +119,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.INodeId;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -148,6 +150,7 @@ import org.apache.hadoop.security.proto.
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
+import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.google.protobuf.ByteString;
import com.google.protobuf.CodedInputStream;
@@ -472,7 +475,8 @@ public class PBHelper {
PBHelper.convert(di.getId()),
di.hasLocation() ? di.getLocation() : null ,
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
- di.getBlockPoolUsed() , di.getLastUpdate() , di.getXceiverCount() ,
+ di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
+ di.getLastUpdate(), di.getXceiverCount(),
PBHelper.convert(di.getAdminState()));
}
@@ -565,9 +569,21 @@ public class PBHelper {
if (b == null) return null;
Builder builder = LocatedBlockProto.newBuilder();
DatanodeInfo[] locs = b.getLocations();
+ List<DatanodeInfo> cachedLocs =
+ Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
for (int i = 0; i < locs.length; i++) {
- builder.addLocs(i, PBHelper.convert(locs[i]));
+ DatanodeInfo loc = locs[i];
+ builder.addLocs(i, PBHelper.convert(loc));
+ boolean locIsCached = cachedLocs.contains(loc);
+ builder.addIsCached(locIsCached);
+ if (locIsCached) {
+ cachedLocs.remove(loc);
+ }
}
+ Preconditions.checkArgument(cachedLocs.size() == 0,
+ "Found additional cached replica locations that are not in the set of"
+ + " storage-backed locations!");
+
return builder.setB(PBHelper.convert(b.getBlock()))
.setBlockToken(PBHelper.convert(b.getBlockToken()))
.setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
@@ -580,9 +596,20 @@ public class PBHelper {
for (int i = 0; i < locs.size(); i++) {
targets[i] = PBHelper.convert(locs.get(i));
}
+ // Set values from the isCached list, re-using references from loc
+ List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
+ List<Boolean> isCachedList = proto.getIsCachedList();
+ for (int i=0; i<isCachedList.size(); i++) {
+ if (isCachedList.get(i)) {
+ cachedLocs.add(targets[i]);
+ }
+ }
+
LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
- proto.getOffset(), proto.getCorrupt());
+ proto.getOffset(), proto.getCorrupt(),
+ cachedLocs.toArray(new DatanodeInfo[0]));
lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
+
return lb;
}
@@ -671,6 +698,8 @@ public class PBHelper {
return PBHelper.convert(proto.getKeyUpdateCmd());
case RegisterCommand:
return REG_CMD;
+ case BlockIdCommand:
+ return PBHelper.convert(proto.getBlkIdCmd());
}
return null;
}
@@ -723,6 +752,26 @@ public class PBHelper {
builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
return builder.build();
}
+
+ public static BlockIdCommandProto convert(BlockIdCommand cmd) {
+ BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder()
+ .setBlockPoolId(cmd.getBlockPoolId());
+ switch (cmd.getAction()) {
+ case DatanodeProtocol.DNA_CACHE:
+ builder.setAction(BlockIdCommandProto.Action.CACHE);
+ break;
+ case DatanodeProtocol.DNA_UNCACHE:
+ builder.setAction(BlockIdCommandProto.Action.UNCACHE);
+ break;
+ default:
+ throw new AssertionError("Invalid action");
+ }
+ long[] blockIds = cmd.getBlockIds();
+ for (int i = 0; i < blockIds.length; i++) {
+ builder.addBlockIds(blockIds[i]);
+ }
+ return builder.build();
+ }
private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) {
DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
@@ -766,8 +815,13 @@ public class PBHelper {
case DatanodeProtocol.DNA_TRANSFER:
case DatanodeProtocol.DNA_INVALIDATE:
case DatanodeProtocol.DNA_SHUTDOWN:
- builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
- PBHelper.convert((BlockCommand) datanodeCommand));
+ builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).
+ setBlkCmd(PBHelper.convert((BlockCommand) datanodeCommand));
+ break;
+ case DatanodeProtocol.DNA_CACHE:
+ case DatanodeProtocol.DNA_UNCACHE:
+ builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
+ setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
break;
case DatanodeProtocol.DNA_UNKNOWN: //Not expected
default:
@@ -818,10 +872,32 @@ public class PBHelper {
case SHUTDOWN:
action = DatanodeProtocol.DNA_SHUTDOWN;
break;
+ default:
+ throw new AssertionError("Unknown action type: " + blkCmd.getAction());
}
return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
}
+ public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
+ int numBlockIds = blkIdCmd.getBlockIdsCount();
+ long blockIds[] = new long[numBlockIds];
+ for (int i = 0; i < numBlockIds; i++) {
+ blockIds[i] = blkIdCmd.getBlockIds(i);
+ }
+ int action = DatanodeProtocol.DNA_UNKNOWN;
+ switch (blkIdCmd.getAction()) {
+ case CACHE:
+ action = DatanodeProtocol.DNA_CACHE;
+ break;
+ case UNCACHE:
+ action = DatanodeProtocol.DNA_UNCACHE;
+ break;
+ default:
+ throw new AssertionError("Unknown action type: " + blkIdCmd.getAction());
+ }
+ return new BlockIdCommand(action, blkIdCmd.getBlockPoolId(), blockIds);
+ }
+
public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
List<DatanodeInfoProto> proto = datanodeInfosProto.getDatanodesList();
DatanodeInfo[] infos = new DatanodeInfo[proto.size()];
@@ -1358,10 +1434,11 @@ public class PBHelper {
}
public static StorageReportProto convert(StorageReport r) {
- return StorageReportProto.newBuilder()
+ StorageReportProto.Builder builder = StorageReportProto.newBuilder()
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
.setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
- .setStorageID(r.getStorageID()).build();
+ .setStorageID(r.getStorageID());
+ return builder.build();
}
public static JournalInfo convert(JournalInfoProto info) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Tue Oct 29 00:49:20 2013
@@ -85,7 +85,7 @@ public class BlockInfo extends Block imp
this.bc = bc;
}
- DatanodeDescriptor getDatanode(int index) {
+ public DatanodeDescriptor getDatanode(int index) {
assert this.triplets != null : "BlockInfo is not initialized";
assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
return (DatanodeDescriptor)triplets[index*3];
@@ -153,7 +153,7 @@ public class BlockInfo extends Block imp
return info;
}
- int getCapacity() {
+ public int getCapacity() {
assert this.triplets != null : "BlockInfo is not initialized";
assert triplets.length % 3 == 0 : "Malformed BlockInfo";
return triplets.length / 3;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Oct 29 00:49:20 2013
@@ -3138,6 +3138,13 @@ assert storedBlock.findDatanode(dn) < 0
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
}
+ /**
+ * Get the replicas which are corrupt for a given block.
+ */
+ public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
+ return corruptReplicas.getNodes(block);
+ }
+
/** @return the size of UnderReplicatedBlocks */
public int numOfUnderReplicatedBlocks() {
return neededReplications.size();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Oct 29 00:49:20 2013
@@ -22,14 +22,20 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Queue;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
/**
* This class extends the DatanodeInfo class with ephemeral information (eg
* health, capacity, what blocks are associated with the Datanode) that is
@@ -94,8 +100,74 @@ public class DatanodeDescriptor extends
}
}
+ /**
+ * A list of CachedBlock objects on this datanode.
+ */
+ public static class CachedBlocksList extends IntrusiveCollection<CachedBlock> {
+ public enum Type {
+ PENDING_CACHED,
+ CACHED,
+ PENDING_UNCACHED
+ }
+
+ private final DatanodeDescriptor datanode;
+
+ private final Type type;
+
+ CachedBlocksList(DatanodeDescriptor datanode, Type type) {
+ this.datanode = datanode;
+ this.type = type;
+ }
+
+ public DatanodeDescriptor getDatanode() {
+ return datanode;
+ }
+
+ public Type getType() {
+ return type;
+ }
+ }
+
+ /**
+ * The blocks which we want to cache on this DataNode.
+ */
+ private final CachedBlocksList pendingCached =
+ new CachedBlocksList(this, CachedBlocksList.Type.PENDING_CACHED);
+
+ /**
+ * The blocks which we know are cached on this datanode.
+ * This list is updated by periodic cache reports.
+ */
+ private final CachedBlocksList cached =
+ new CachedBlocksList(this, CachedBlocksList.Type.CACHED);
+
+ /**
+ * The blocks which we want to uncache on this DataNode.
+ */
+ private final CachedBlocksList pendingUncached =
+ new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
+
+ public CachedBlocksList getPendingCached() {
+ return pendingCached;
+ }
+
+ public CachedBlocksList getCached() {
+ return cached;
+ }
+
+ public CachedBlocksList getPendingUncached() {
+ return pendingUncached;
+ }
+
+ /**
+ * Head of the list of blocks on the datanode
+ */
private volatile BlockInfo blockList = null;
+ /**
+ * Number of blocks on the datanode
+ */
private int numBlocks = 0;
+
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
public boolean isAlive = false;
@@ -160,7 +232,7 @@ public class DatanodeDescriptor extends
* @param nodeID id of the data node
*/
public DatanodeDescriptor(DatanodeID nodeID) {
- this(nodeID, 0L, 0L, 0L, 0L, 0, 0);
+ this(nodeID, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
}
/**
@@ -170,7 +242,7 @@ public class DatanodeDescriptor extends
*/
public DatanodeDescriptor(DatanodeID nodeID,
String networkLocation) {
- this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0, 0);
+ this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
}
/**
@@ -180,6 +252,8 @@ public class DatanodeDescriptor extends
* @param dfsUsed space used by the data node
* @param remaining remaining capacity of the data node
* @param bpused space used by the block pool corresponding to this namenode
+ * @param cacheCapacity cache capacity of the data node
+ * @param cacheUsed cache used on the data node
* @param xceiverCount # of data transfers at the data node
*/
public DatanodeDescriptor(DatanodeID nodeID,
@@ -187,11 +261,13 @@ public class DatanodeDescriptor extends
long dfsUsed,
long remaining,
long bpused,
+ long cacheCapacity,
+ long cacheUsed,
int xceiverCount,
int failedVolumes) {
super(nodeID);
- updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount,
- failedVolumes);
+ updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
+ cacheUsed, xceiverCount, failedVolumes);
}
/**
@@ -202,6 +278,8 @@ public class DatanodeDescriptor extends
* @param dfsUsed the used space by dfs datanode
* @param remaining remaining capacity of the data node
* @param bpused space used by the block pool corresponding to this namenode
+ * @param cacheCapacity cache capacity of the data node
+ * @param cacheUsed cache used on the data node
* @param xceiverCount # of data transfers at the data node
*/
public DatanodeDescriptor(DatanodeID nodeID,
@@ -210,11 +288,13 @@ public class DatanodeDescriptor extends
long dfsUsed,
long remaining,
long bpused,
+ long cacheCapacity,
+ long cacheUsed,
int xceiverCount,
int failedVolumes) {
super(nodeID, networkLocation);
- updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount,
- failedVolumes);
+ updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
+ cacheUsed, xceiverCount, failedVolumes);
}
/**
@@ -257,6 +337,7 @@ public class DatanodeDescriptor extends
* Used for testing only
* @return the head of the blockList
*/
+ @VisibleForTesting
protected BlockInfo getHead(){
return blockList;
}
@@ -285,6 +366,11 @@ public class DatanodeDescriptor extends
this.blockList = null;
this.invalidateBlocks.clear();
this.volumeFailures = 0;
+ // pendingCached, cached, and pendingUncached are protected by the
+ // FSN lock.
+ this.pendingCached.clear();
+ this.cached.clear();
+ this.pendingUncached.clear();
}
public void clearBlockQueues() {
@@ -293,6 +379,11 @@ public class DatanodeDescriptor extends
this.recoverBlocks.clear();
this.replicateBlocks.clear();
}
+ // pendingCached, cached, and pendingUncached are protected by the
+ // FSN lock.
+ this.pendingCached.clear();
+ this.cached.clear();
+ this.pendingUncached.clear();
}
public int numBlocks() {
@@ -303,11 +394,14 @@ public class DatanodeDescriptor extends
* Updates stats from datanode heartbeat.
*/
public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
- long blockPoolUsed, int xceiverCount, int volFailures) {
+ long blockPoolUsed, long cacheCapacity, long cacheUsed, int xceiverCount,
+ int volFailures) {
setCapacity(capacity);
setRemaining(remaining);
setBlockPoolUsed(blockPoolUsed);
setDfsUsed(dfsUsed);
+ setCacheCapacity(cacheCapacity);
+ setCacheUsed(cacheUsed);
setXceiverCount(xceiverCount);
setLastUpdate(Time.now());
this.volumeFailures = volFailures;
@@ -348,7 +442,7 @@ public class DatanodeDescriptor extends
public Iterator<BlockInfo> getBlockIterator() {
return new BlockIterator(this.blockList, this);
}
-
+
/**
* Store block replication work.
*/
@@ -380,7 +474,7 @@ public class DatanodeDescriptor extends
}
}
}
-
+
/**
* The number of work items that are pending to be replicated
*/
@@ -397,7 +491,7 @@ public class DatanodeDescriptor extends
return invalidateBlocks.size();
}
}
-
+
public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
return replicateBlocks.poll(maxTransfers);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Oct 29 00:49:20 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
@@ -45,6 +47,7 @@ import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
@@ -144,6 +147,12 @@ public class DatanodeManager {
private final boolean checkIpHostnameInRegistration;
/**
+ * Whether we should tell datanodes what to cache in replies to
+ * heartbeat messages.
+ */
+ private boolean sendCachingCommands = false;
+
+ /**
* The number of datanodes for each software version. This list should change
* during rolling upgrades.
* Software version -> Number of datanodes with this version
@@ -1215,8 +1224,8 @@ public class DatanodeManager {
public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
final String blockPoolId,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- int xceiverCount, int maxTransfers, int failedVolumes
- ) throws IOException {
+ long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers,
+ int failedVolumes) throws IOException {
synchronized (heartbeatManager) {
synchronized (datanodeMap) {
DatanodeDescriptor nodeinfo = null;
@@ -1237,7 +1246,8 @@ public class DatanodeManager {
}
heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
- remaining, blockPoolUsed, xceiverCount, failedVolumes);
+ remaining, blockPoolUsed, cacheCapacity, cacheUsed, xceiverCount,
+ failedVolumes);
// If we are in safemode, do not send back any recovery / replication
// requests. Don't even drain the existing queue of work.
@@ -1298,7 +1308,19 @@ public class DatanodeManager {
cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
blockPoolId, blks));
}
-
+ DatanodeCommand pendingCacheCommand =
+ getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+ DatanodeProtocol.DNA_CACHE, blockPoolId);
+ if (pendingCacheCommand != null) {
+ cmds.add(pendingCacheCommand);
+ }
+ DatanodeCommand pendingUncacheCommand =
+ getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+ DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+ if (pendingUncacheCommand != null) {
+ cmds.add(pendingUncacheCommand);
+ }
+
blockManager.addKeyUpdateCommand(cmds, nodeinfo);
// check for balancer bandwidth update
@@ -1318,6 +1340,40 @@ public class DatanodeManager {
}
/**
+ * Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
+ *
+ * @param list The {@link CachedBlocksList}. This function
+ * clears the list.
+ * @param datanode The datanode.
+ * @param action The action to perform in the command.
+ * @param poolId The block pool id.
+ * @return A DatanodeCommand to be sent back to the DN, or null if
+ * there is nothing to be done.
+ */
+ private DatanodeCommand getCacheCommand(CachedBlocksList list,
+ DatanodeDescriptor datanode, int action, String poolId) {
+ int length = list.size();
+ if (length == 0) {
+ return null;
+ }
+ // Read and clear the existing cache commands.
+ long[] blockIds = new long[length];
+ int i = 0;
+ for (Iterator<CachedBlock> iter = list.iterator();
+ iter.hasNext(); ) {
+ CachedBlock cachedBlock = iter.next();
+ blockIds[i++] = cachedBlock.getBlockId();
+ iter.remove();
+ }
+ if (!sendCachingCommands) {
+ // Do not send caching commands unless the FSNamesystem told us we
+ // should.
+ return null;
+ }
+ return new BlockIdCommand(action, poolId, blockIds);
+ }
+
+ /**
* Tell all datanodes to use a new, non-persistent bandwidth value for
* dfs.balance.bandwidthPerSec.
*
@@ -1365,4 +1421,8 @@ public class DatanodeManager {
public String toString() {
return getClass().getSimpleName() + ": " + host2DatanodeMap;
}
+
+ public void setSendCachingCommands(boolean sendCachingCommands) {
+ this.sendCachingCommands = sendCachingCommands;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Tue Oct 29 00:49:20 2013
@@ -170,7 +170,7 @@ class HeartbeatManager implements Datano
addDatanode(d);
//update its timestamp
- d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
+ d.updateHeartbeat(0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
}
}
@@ -193,10 +193,10 @@ class HeartbeatManager implements Datano
synchronized void updateHeartbeat(final DatanodeDescriptor node,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- int xceiverCount, int failedVolumes) {
+ long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes) {
stats.subtract(node);
node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
- xceiverCount, failedVolumes);
+ cacheCapacity, cacheUsed, xceiverCount, failedVolumes);
stats.add(node);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Oct 29 00:49:20 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.E
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -518,6 +519,8 @@ class BPOfferService {
return true;
final BlockCommand bcmd =
cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+ final BlockIdCommand blockIdCmd =
+ cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null;
switch(cmd.getAction()) {
case DatanodeProtocol.DNA_TRANSFER:
@@ -543,6 +546,16 @@ class BPOfferService {
}
dn.metrics.incrBlocksRemoved(toDelete.length);
break;
+ case DatanodeProtocol.DNA_CACHE:
+ LOG.info("DatanodeCommand action: DNA_CACHE");
+ dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+ dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
+ break;
+ case DatanodeProtocol.DNA_UNCACHE:
+ LOG.info("DatanodeCommand action: DNA_UNCACHE");
+ dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+ dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
+ break;
case DatanodeProtocol.DNA_SHUTDOWN:
// TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
// See HDFS-2987.
@@ -615,6 +628,8 @@ class BPOfferService {
case DatanodeProtocol.DNA_FINALIZE:
case DatanodeProtocol.DNA_RECOVERBLOCK:
case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+ case DatanodeProtocol.DNA_CACHE:
+ case DatanodeProtocol.DNA_UNCACHE:
LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
break;
default:
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Tue Oct 29 00:49:20 2013
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.Collection;
+import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
@@ -84,6 +85,8 @@ class BPServiceActor implements Runnable
boolean resetBlockReportTime = true;
+ volatile long lastCacheReport = 0;
+
Thread bpThread;
DatanodeProtocolClientSideTranslatorPB bpNamenode;
private volatile long lastHeartbeat = 0;
@@ -430,6 +433,35 @@ class BPServiceActor implements Runnable
return cmd;
}
+ DatanodeCommand cacheReport() throws IOException {
+ // If caching is disabled, do not send a cache report
+ if (dn.getFSDataset().getDnCacheCapacity() == 0) {
+ return null;
+ }
+ // send cache report if timer has expired.
+ DatanodeCommand cmd = null;
+ long startTime = Time.monotonicNow();
+ if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Sending cacheReport from service actor: " + this);
+ }
+ lastCacheReport = startTime;
+
+ String bpid = bpos.getBlockPoolId();
+ List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
+ long createTime = Time.monotonicNow();
+
+ cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
+ long sendTime = Time.monotonicNow();
+ long createCost = createTime - startTime;
+ long sendCost = sendTime - createTime;
+ dn.getMetrics().addCacheReport(sendCost);
+ LOG.info("CacheReport of " + blockIds.size()
+ + " blocks took " + createCost + " msec to generate and "
+ + sendCost + " msecs for RPC and NN processing");
+ }
+ return cmd;
+ }
HeartbeatResponse sendHeartBeat() throws IOException {
if (LOG.isDebugEnabled()) {
@@ -443,6 +475,8 @@ class BPServiceActor implements Runnable
dn.getFSDataset().getRemaining(),
dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
return bpNamenode.sendHeartbeat(bpRegistration, report,
+ dn.getFSDataset().getDnCacheCapacity(),
+ dn.getFSDataset().getDnCacheUsed(),
dn.getXmitsInProgress(),
dn.getXceiverCount(),
dn.getFSDataset().getNumFailedVolumes());
@@ -496,11 +530,12 @@ class BPServiceActor implements Runnable
* forever calling remote NameNode functions.
*/
private void offerService() throws Exception {
- LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
- + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
- + dnConf.blockReportInterval + "msec" + " Initial delay: "
- + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
- + dnConf.heartBeatInterval);
+ LOG.info("For namenode " + nnAddr + " using"
+ + " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
+ + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+ + "; heartBeatInterval=" + dnConf.heartBeatInterval);
//
// Now loop for a long time....
@@ -555,6 +590,9 @@ class BPServiceActor implements Runnable
DatanodeCommand cmd = blockReport();
processCommand(new DatanodeCommand[]{ cmd });
+ cmd = cacheReport();
+ processCommand(new DatanodeCommand[]{ cmd });
+
// Now safe to start scanning the block pool.
// If it has already been started, this is a no-op.
if (dn.blockScanner != null) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Tue Oct 29 00:49:20 2013
@@ -18,13 +18,18 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.classification.InterfaceAudience;
+
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
@@ -39,6 +44,7 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -66,6 +72,7 @@ public class DNConf {
final long blockReportInterval;
final long deleteReportInterval;
final long initialBlockReportDelay;
+ final long cacheReportInterval;
final int writePacketSize;
final String minimumNameNodeVersion;
@@ -73,6 +80,8 @@ public class DNConf {
final long xceiverStopTimeout;
+ final long maxLockedMemory;
+
public DNConf(Configuration conf) {
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
HdfsServerConstants.READ_TIMEOUT);
@@ -107,7 +116,9 @@ public class DNConf {
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
- DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+ DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+ this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+ DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
long initBRDelay = conf.getLong(
DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
@@ -137,6 +148,10 @@ public class DNConf {
this.xceiverStopTimeout = conf.getLong(
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
+
+ this.maxLockedMemory = conf.getLong(
+ DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
}
// We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -147,4 +162,8 @@ public class DNConf {
public long getXceiverStopTimeout() {
return xceiverStopTimeout;
}
+
+ public long getMaxLockedMemory() {
+ return maxLockedMemory;
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Oct 29 00:49:20 2013
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
@@ -656,6 +657,25 @@ public class DataNode extends Configured
this.conf = conf;
this.dnConf = new DNConf(conf);
+ if (dnConf.maxLockedMemory > 0) {
+ if (!NativeIO.isAvailable()) {
+ throw new RuntimeException(String.format(
+ "Cannot start datanode because the configured max locked memory" +
+ " size (%s) is greater than zero and native code is not available.",
+ DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
+ }
+ long ulimit = NativeIO.getMemlockLimit();
+ if (dnConf.maxLockedMemory > ulimit) {
+ throw new RuntimeException(String.format(
+ "Cannot start datanode because the configured max locked memory" +
+ " size (%s) of %d bytes is more than the datanode's available" +
+ " RLIMIT_MEMLOCK ulimit of %d bytes.",
+ DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ dnConf.maxLockedMemory,
+ ulimit));
+ }
+ }
+
storage = new DataStorage();
// global DN settings
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Oct 29 00:49:20 2013
@@ -269,6 +269,14 @@ public interface FsDatasetSpi<V extends
*/
public BlockListAsLongs getBlockReport(String bpid);
+ /**
+ * Returns the cache report - the full list of cached block IDs of a
+ * block pool.
+ * @param bpid Block Pool Id
+ * @return the cache report - the full list of cached block IDs.
+ */
+ public List<Long> getCacheReport(String bpid);
+
/** Does the dataset contain the block? */
public boolean contains(ExtendedBlock block);
@@ -294,6 +302,20 @@ public interface FsDatasetSpi<V extends
*/
public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
+ /**
+ * Caches the specified blocks
+ * @param bpid Block pool id
+ * @param blockIds - block ids to cache
+ */
+ public void cache(String bpid, long[] blockIds);
+
+ /**
+ * Uncaches the specified blocks
+ * @param bpid Block pool id
+ * @param blockIds - blocks ids to uncache
+ */
+ public void uncache(String bpid, long[] blockIds);
+
/**
* Check if all the data directories are healthy
* @throws DiskErrorException
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Oct 29 00:49:20 2013
@@ -37,6 +37,7 @@ import javax.management.NotCompliantMBea
import javax.management.ObjectName;
import javax.management.StandardMBean;
+import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -170,6 +171,7 @@ class FsDatasetImpl implements FsDataset
final FsVolumeList volumes;
final ReplicaMap volumeMap;
final FsDatasetAsyncDiskService asyncDiskService;
+ final FsDatasetCache cacheManager;
private final int validVolsRequired;
// Used for synchronizing access to usage stats
@@ -228,6 +230,7 @@ class FsDatasetImpl implements FsDataset
roots[idx] = storage.getStorageDir(idx).getCurrentDir();
}
asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
+ cacheManager = new FsDatasetCache(this);
registerMBean(storage.getStorageID());
}
@@ -288,6 +291,22 @@ class FsDatasetImpl implements FsDataset
}
/**
+ * Returns the total cache used by the datanode (in bytes).
+ */
+ @Override // FSDatasetMBean
+ public long getDnCacheUsed() {
+ return cacheManager.getDnCacheUsed();
+ }
+
+ /**
+ * Returns the total cache capacity of the datanode (in bytes).
+ */
+ @Override // FSDatasetMBean
+ public long getDnCacheCapacity() {
+ return cacheManager.getDnCacheCapacity();
+ }
+
+ /**
* Find the block's on-disk length
*/
@Override // FsDatasetSpi
@@ -534,6 +553,8 @@ class FsDatasetImpl implements FsDataset
private synchronized ReplicaBeingWritten append(String bpid,
FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
throws IOException {
+ // uncache the block
+ cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
// unlink the finalized replica
replicaInfo.unlinkBlock(1);
@@ -1001,6 +1022,11 @@ class FsDatasetImpl implements FsDataset
}
}
+ @Override // FsDatasetSpi
+ public List<Long> getCacheReport(String bpid) {
+ return cacheManager.getCachedBlocks(bpid);
+ }
+
/**
* Get the list of finalized blocks from in-memory blockmap for a block pool.
*/
@@ -1143,6 +1169,8 @@ class FsDatasetImpl implements FsDataset
volumeMap.remove(bpid, invalidBlks[i]);
}
+ // Uncache the block synchronously
+ cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
// Delete the block asynchronously to make sure we can do it fast enough
asyncDiskService.deleteAsync(v, f,
FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
@@ -1153,6 +1181,82 @@ class FsDatasetImpl implements FsDataset
}
}
+ synchronized boolean validToCache(String bpid, long blockId) {
+ ReplicaInfo info = volumeMap.get(bpid, blockId);
+ if (info == null) {
+ LOG.warn("Failed to cache replica in block pool " + bpid +
+ " with block id " + blockId + ": ReplicaInfo not found.");
+ return false;
+ }
+ FsVolumeImpl volume = (FsVolumeImpl)info.getVolume();
+ if (volume == null) {
+ LOG.warn("Failed to cache block with id " + blockId +
+ ": Volume not found.");
+ return false;
+ }
+ if (info.getState() != ReplicaState.FINALIZED) {
+ LOG.warn("Failed to block with id " + blockId +
+ ": Replica is not finalized.");
+ return false;
+ }
+ return true;
+ }
+
+ /**
+ * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
+ */
+ private void cacheBlock(String bpid, long blockId) {
+ ReplicaInfo info;
+ FsVolumeImpl volume;
+ synchronized (this) {
+ if (!validToCache(bpid, blockId)) {
+ return;
+ }
+ info = volumeMap.get(bpid, blockId);
+ volume = (FsVolumeImpl)info.getVolume();
+ }
+ // Try to open block and meta streams
+ FileInputStream blockIn = null;
+ FileInputStream metaIn = null;
+ boolean success = false;
+ ExtendedBlock extBlk =
+ new ExtendedBlock(bpid, blockId,
+ info.getBytesOnDisk(), info.getGenerationStamp());
+ try {
+ blockIn = (FileInputStream)getBlockInputStream(extBlk, 0);
+ metaIn = (FileInputStream)getMetaDataInputStream(extBlk)
+ .getWrappedStream();
+ success = true;
+ } catch (ClassCastException e) {
+ LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks"
+ + " are not backed by files.", e);
+ } catch (IOException e) {
+ LOG.warn("Failed to cache replica " + extBlk + ": IOException while"
+ + " trying to open block or meta files.", e);
+ }
+ if (!success) {
+ IOUtils.closeQuietly(blockIn);
+ IOUtils.closeQuietly(metaIn);
+ return;
+ }
+ cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(),
+ volume, blockIn, metaIn);
+ }
+
+ @Override // FsDatasetSpi
+ public void cache(String bpid, long[] blockIds) {
+ for (int i=0; i < blockIds.length; i++) {
+ cacheBlock(bpid, blockIds[i]);
+ }
+ }
+
+ @Override // FsDatasetSpi
+ public void uncache(String bpid, long[] blockIds) {
+ for (int i=0; i < blockIds.length; i++) {
+ cacheManager.uncacheBlock(bpid, blockIds[i]);
+ }
+ }
+
@Override // FsDatasetSpi
public synchronized boolean contains(final ExtendedBlock block) {
final long blockId = block.getLocalBlock().getBlockId();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Tue Oct 29 00:49:20 2013
@@ -18,11 +18,17 @@
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
+import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
@@ -34,6 +40,8 @@ import org.apache.hadoop.hdfs.server.dat
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* The underlying volume used to store replica.
*
@@ -48,6 +56,13 @@ class FsVolumeImpl implements FsVolumeSp
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final long reserved;
+ /**
+ * Per-volume worker pool that processes new blocks to cache.
+ * The maximum number of workers per volume is bounded (configurable via
+ * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
+ * contention.
+ */
+ private final ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf) throws IOException {
@@ -59,6 +74,20 @@ class FsVolumeImpl implements FsVolumeSp
this.currentDir = currentDir;
File parent = currentDir.getParentFile();
this.usage = new DF(parent, conf);
+ final int maxNumThreads = dataset.datanode.getConf().getInt(
+ DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
+ DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT
+ );
+ ThreadFactory workerFactory = new ThreadFactoryBuilder()
+ .setDaemon(true)
+ .setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
+ .build();
+ cacheExecutor = new ThreadPoolExecutor(
+ 1, maxNumThreads,
+ 60, TimeUnit.SECONDS,
+ new LinkedBlockingQueue<Runnable>(),
+ workerFactory);
+ cacheExecutor.allowCoreThreadTimeOut(true);
}
File getCurrentDir() {
@@ -166,7 +195,11 @@ class FsVolumeImpl implements FsVolumeSp
File addBlock(String bpid, Block b, File f) throws IOException {
return getBlockPoolSlice(bpid).addBlock(b, f);
}
-
+
+ Executor getExecutor() {
+ return cacheExecutor;
+ }
+
void checkDirs() throws DiskErrorException {
// TODO:FEDERATION valid synchronization
for(BlockPoolSlice s : bpSlices.values()) {
@@ -210,6 +243,7 @@ class FsVolumeImpl implements FsVolumeSp
}
void shutdown() {
+ cacheExecutor.shutdown();
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) {
entry.getValue().shutdown();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Tue Oct 29 00:49:20 2013
@@ -57,6 +57,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong blocksRemoved;
@Metric MutableCounterLong blocksVerified;
@Metric MutableCounterLong blockVerificationFailures;
+ @Metric MutableCounterLong blocksCached;
+ @Metric MutableCounterLong blocksUncached;
@Metric MutableCounterLong readsFromLocalClient;
@Metric MutableCounterLong readsFromRemoteClient;
@Metric MutableCounterLong writesFromLocalClient;
@@ -74,6 +76,7 @@ public class DataNodeMetrics {
@Metric MutableRate replaceBlockOp;
@Metric MutableRate heartbeats;
@Metric MutableRate blockReports;
+ @Metric MutableRate cacheReports;
@Metric MutableRate packetAckRoundTripTimeNanos;
MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
@@ -151,6 +154,10 @@ public class DataNodeMetrics {
blockReports.add(latency);
}
+ public void addCacheReport(long latency) {
+ cacheReports.add(latency);
+ }
+
public void incrBlocksReplicated(int delta) {
blocksReplicated.incr(delta);
}
@@ -175,6 +182,15 @@ public class DataNodeMetrics {
blocksVerified.incr();
}
+
+ public void incrBlocksCached(int delta) {
+ blocksCached.incr(delta);
+ }
+
+ public void incrBlocksUncached(int delta) {
+ blocksUncached.incr(delta);
+ }
+
public void addReadBlockOp(long latency) {
readBlockOp.add(latency);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Tue Oct 29 00:49:20 2013
@@ -77,4 +77,14 @@ public interface FSDatasetMBean {
* @return The number of failed volumes in the datanode.
*/
public int getNumFailedVolumes();
+
+ /**
+ * Returns the total cache used by the datanode (in bytes).
+ */
+ public long getDnCacheUsed();
+
+ /**
+ * Returns the total cache capacity of the datanode (in bytes).
+ */
+ public long getDnCacheCapacity();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Oct 29 00:49:20 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.F
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
@@ -2592,12 +2593,21 @@ public class FSDirectory implements Clos
int childrenNum = node.isDirectory() ?
node.asDirectory().getChildrenNum(snapshot) : 0;
- return new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
- blocksize, node.getModificationTime(snapshot),
- node.getAccessTime(snapshot), node.getFsPermission(snapshot),
- node.getUserName(snapshot), node.getGroupName(snapshot),
- node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
- node.getId(), loc, childrenNum);
+ HdfsLocatedFileStatus status =
+ new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
+ blocksize, node.getModificationTime(snapshot),
+ node.getAccessTime(snapshot), node.getFsPermission(snapshot),
+ node.getUserName(snapshot), node.getGroupName(snapshot),
+ node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
+ node.getId(), loc, childrenNum);
+ // Set caching information for the located blocks.
+ if (loc != null) {
+ CacheManager cacheManager = namesystem.getCacheManager();
+ for (LocatedBlock lb: loc.getLocatedBlocks()) {
+ cacheManager.setCachedLocations(lb);
+ }
+ }
+ return status;
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Oct 29 00:49:20 2013
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
+import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.now;
import java.io.IOException;
@@ -35,15 +36,18 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
@@ -55,12 +59,17 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogSegmentOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
@@ -69,9 +78,6 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -948,6 +954,45 @@ public class FSEditLog implements LogsPu
logEdit(op);
}
+ void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
+ boolean toLogRpcIds) {
+ AddPathBasedCacheDirectiveOp op = AddPathBasedCacheDirectiveOp.getInstance(
+ cache.get())
+ .setPath(directive.getPath().toUri().getPath())
+ .setReplication(directive.getReplication())
+ .setPool(directive.getPool());
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
+ void logRemovePathBasedCacheDescriptor(Long id, boolean toLogRpcIds) {
+ RemovePathBasedCacheDescriptorOp op =
+ RemovePathBasedCacheDescriptorOp.getInstance(cache.get()).setId(id);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
+ void logAddCachePool(CachePoolInfo pool, boolean toLogRpcIds) {
+ AddCachePoolOp op =
+ AddCachePoolOp.getInstance(cache.get()).setPool(pool);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
+ void logModifyCachePool(CachePoolInfo info, boolean toLogRpcIds) {
+ ModifyCachePoolOp op =
+ ModifyCachePoolOp.getInstance(cache.get()).setInfo(info);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
+ void logRemoveCachePool(String poolName, boolean toLogRpcIds) {
+ RemoveCachePoolOp op =
+ RemoveCachePoolOp.getInstance(cache.get()).setPoolName(poolName);
+ logRpcIds(op, toLogRpcIds);
+ logEdit(op);
+ }
+
/**
* Get all the journals this edit log is currently operating on.
*/
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Oct 29 00:49:20 2013
@@ -30,16 +30,21 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
@@ -52,7 +57,10 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -76,6 +84,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
import org.apache.hadoop.hdfs.util.ChunkedArrayList;
import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.jasper.tagplugins.jstl.core.Remove;
import com.google.common.base.Joiner;
@@ -631,6 +640,54 @@ public class FSEditLogLoader {
fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
break;
}
+ case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
+ AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
+ PathBasedCacheDirective d = new PathBasedCacheDirective.Builder().
+ setPath(new Path(addOp.path)).
+ setReplication(addOp.replication).
+ setPool(addOp.pool).
+ build();
+ PathBasedCacheDescriptor descriptor =
+ fsNamesys.getCacheManager().addDirective(d, null);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
+ descriptor);
+ }
+ break;
+ }
+ case OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR: {
+ RemovePathBasedCacheDescriptorOp removeOp =
+ (RemovePathBasedCacheDescriptorOp) op;
+ fsNamesys.getCacheManager().removeDescriptor(removeOp.id, null);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+ }
+ break;
+ }
+ case OP_ADD_CACHE_POOL: {
+ AddCachePoolOp addOp = (AddCachePoolOp) op;
+ fsNamesys.getCacheManager().addCachePool(addOp.info);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+ }
+ break;
+ }
+ case OP_MODIFY_CACHE_POOL: {
+ ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op;
+ fsNamesys.getCacheManager().modifyCachePool(modifyOp.info);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+ }
+ break;
+ }
+ case OP_REMOVE_CACHE_POOL: {
+ RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op;
+ fsNamesys.getCacheManager().removeCachePool(removeOp.poolName);
+ if (toAddRetryCache) {
+ fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+ }
+ break;
+ }
default:
throw new IOException("Invalid operation read " + op.opCode);
}