You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/10/29 01:49:23 UTC

svn commit: r1536572 [2/4] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/hadoop/hdfs/p...

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java Tue Oct 29 00:49:20 2013
@@ -27,6 +27,8 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReceivedAndDeletedResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportRequestProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CacheReportResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
@@ -55,6 +57,7 @@ import org.apache.hadoop.hdfs.server.pro
 import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
 import org.apache.hadoop.hdfs.server.protocol.StorageReport;
 
+import com.google.common.primitives.Longs;
 import com.google.protobuf.RpcController;
 import com.google.protobuf.ServiceException;
 
@@ -109,8 +112,9 @@ public class DatanodeProtocolServerSideT
             p.getBlockPoolUsed());
       }
       response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
-          report, request.getXmitsInProgress(), request.getXceiverCount(),
-          request.getFailedVolumes());
+          report, request.getDnCacheCapacity(), request.getDnCacheUsed(),
+          request.getXmitsInProgress(),
+          request.getXceiverCount(), request.getFailedVolumes());
     } catch (IOException e) {
       throw new ServiceException(e);
     }
@@ -160,6 +164,27 @@ public class DatanodeProtocolServerSideT
   }
 
   @Override
+  public CacheReportResponseProto cacheReport(RpcController controller,
+      CacheReportRequestProto request) throws ServiceException {
+    DatanodeCommand cmd = null;
+    try {
+      cmd = impl.cacheReport(
+          PBHelper.convert(request.getRegistration()),
+          request.getBlockPoolId(),
+          request.getBlocksList());
+    } catch (IOException e) {
+      throw new ServiceException(e);
+    }
+    CacheReportResponseProto.Builder builder =
+        CacheReportResponseProto.newBuilder();
+    if (cmd != null) {
+      builder.setCmd(PBHelper.convert(cmd));
+    }
+    return builder.build();
+  }
+
+
+  @Override
   public BlockReceivedAndDeletedResponseProto blockReceivedAndDeleted(
       RpcController controller, BlockReceivedAndDeletedRequestProto request)
       throws ServiceException {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Tue Oct 29 00:49:20 2013
@@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.p
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
+import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockRecoveryCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
 import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
@@ -118,6 +119,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
 import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
@@ -148,6 +150,7 @@ import org.apache.hadoop.security.proto.
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.util.DataChecksum;
 
+import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 import com.google.protobuf.ByteString;
 import com.google.protobuf.CodedInputStream;
@@ -472,7 +475,8 @@ public class PBHelper {
         PBHelper.convert(di.getId()),
         di.hasLocation() ? di.getLocation() : null , 
         di.getCapacity(),  di.getDfsUsed(),  di.getRemaining(),
-        di.getBlockPoolUsed()  ,  di.getLastUpdate() , di.getXceiverCount() ,
+        di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
+        di.getLastUpdate(), di.getXceiverCount(),
         PBHelper.convert(di.getAdminState())); 
   }
   
@@ -565,9 +569,21 @@ public class PBHelper {
     if (b == null) return null;
     Builder builder = LocatedBlockProto.newBuilder();
     DatanodeInfo[] locs = b.getLocations();
+    List<DatanodeInfo> cachedLocs =
+        Lists.newLinkedList(Arrays.asList(b.getCachedLocations()));
     for (int i = 0; i < locs.length; i++) {
-      builder.addLocs(i, PBHelper.convert(locs[i]));
+      DatanodeInfo loc = locs[i];
+      builder.addLocs(i, PBHelper.convert(loc));
+      boolean locIsCached = cachedLocs.contains(loc);
+      builder.addIsCached(locIsCached);
+      if (locIsCached) {
+        cachedLocs.remove(loc);
+      }
     }
+    Preconditions.checkArgument(cachedLocs.size() == 0,
+        "Found additional cached replica locations that are not in the set of"
+        + " storage-backed locations!");
+
     return builder.setB(PBHelper.convert(b.getBlock()))
         .setBlockToken(PBHelper.convert(b.getBlockToken()))
         .setCorrupt(b.isCorrupt()).setOffset(b.getStartOffset()).build();
@@ -580,9 +596,20 @@ public class PBHelper {
     for (int i = 0; i < locs.size(); i++) {
       targets[i] = PBHelper.convert(locs.get(i));
     }
+    // Set values from the isCached list, re-using references from loc
+    List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
+    List<Boolean> isCachedList = proto.getIsCachedList();
+    for (int i=0; i<isCachedList.size(); i++) {
+      if (isCachedList.get(i)) {
+        cachedLocs.add(targets[i]);
+      }
+    }
+
     LocatedBlock lb = new LocatedBlock(PBHelper.convert(proto.getB()), targets,
-        proto.getOffset(), proto.getCorrupt());
+        proto.getOffset(), proto.getCorrupt(),
+        cachedLocs.toArray(new DatanodeInfo[0]));
     lb.setBlockToken(PBHelper.convert(proto.getBlockToken()));
+
     return lb;
   }
 
@@ -671,6 +698,8 @@ public class PBHelper {
       return PBHelper.convert(proto.getKeyUpdateCmd());
     case RegisterCommand:
       return REG_CMD;
+    case BlockIdCommand:
+      return PBHelper.convert(proto.getBlkIdCmd());
     }
     return null;
   }
@@ -723,6 +752,26 @@ public class PBHelper {
     builder.addAllTargets(PBHelper.convert(cmd.getTargets()));
     return builder.build();
   }
+  
+  public static BlockIdCommandProto convert(BlockIdCommand cmd) {
+    BlockIdCommandProto.Builder builder = BlockIdCommandProto.newBuilder()
+        .setBlockPoolId(cmd.getBlockPoolId());
+    switch (cmd.getAction()) {
+    case DatanodeProtocol.DNA_CACHE:
+      builder.setAction(BlockIdCommandProto.Action.CACHE);
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      builder.setAction(BlockIdCommandProto.Action.UNCACHE);
+      break;
+    default:
+      throw new AssertionError("Invalid action");
+    }
+    long[] blockIds = cmd.getBlockIds();
+    for (int i = 0; i < blockIds.length; i++) {
+      builder.addBlockIds(blockIds[i]);
+    }
+    return builder.build();
+  }
 
   private static List<DatanodeInfosProto> convert(DatanodeInfo[][] targets) {
     DatanodeInfosProto[] ret = new DatanodeInfosProto[targets.length];
@@ -766,8 +815,13 @@ public class PBHelper {
     case DatanodeProtocol.DNA_TRANSFER:
     case DatanodeProtocol.DNA_INVALIDATE:
     case DatanodeProtocol.DNA_SHUTDOWN:
-      builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).setBlkCmd(
-          PBHelper.convert((BlockCommand) datanodeCommand));
+      builder.setCmdType(DatanodeCommandProto.Type.BlockCommand).
+        setBlkCmd(PBHelper.convert((BlockCommand) datanodeCommand));
+      break;
+    case DatanodeProtocol.DNA_CACHE:
+    case DatanodeProtocol.DNA_UNCACHE:
+      builder.setCmdType(DatanodeCommandProto.Type.BlockIdCommand).
+        setBlkIdCmd(PBHelper.convert((BlockIdCommand) datanodeCommand));
       break;
     case DatanodeProtocol.DNA_UNKNOWN: //Not expected
     default:
@@ -818,10 +872,32 @@ public class PBHelper {
     case SHUTDOWN:
       action = DatanodeProtocol.DNA_SHUTDOWN;
       break;
+    default:
+      throw new AssertionError("Unknown action type: " + blkCmd.getAction());
     }
     return new BlockCommand(action, blkCmd.getBlockPoolId(), blocks, targets);
   }
 
+  public static BlockIdCommand convert(BlockIdCommandProto blkIdCmd) {
+    int numBlockIds = blkIdCmd.getBlockIdsCount();
+    long blockIds[] = new long[numBlockIds];
+    for (int i = 0; i < numBlockIds; i++) {
+      blockIds[i] = blkIdCmd.getBlockIds(i);
+    }
+    int action = DatanodeProtocol.DNA_UNKNOWN;
+    switch (blkIdCmd.getAction()) {
+    case CACHE:
+      action = DatanodeProtocol.DNA_CACHE;
+      break;
+    case UNCACHE:
+      action = DatanodeProtocol.DNA_UNCACHE;
+      break;
+    default:
+      throw new AssertionError("Unknown action type: " + blkIdCmd.getAction());
+    }
+    return new BlockIdCommand(action, blkIdCmd.getBlockPoolId(), blockIds);
+  }
+
   public static DatanodeInfo[] convert(DatanodeInfosProto datanodeInfosProto) {
     List<DatanodeInfoProto> proto = datanodeInfosProto.getDatanodesList();
     DatanodeInfo[] infos = new DatanodeInfo[proto.size()];
@@ -1358,10 +1434,11 @@ public class PBHelper {
   }
 
   public static StorageReportProto convert(StorageReport r) {
-    return StorageReportProto.newBuilder()
+    StorageReportProto.Builder builder = StorageReportProto.newBuilder()
         .setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())
         .setDfsUsed(r.getDfsUsed()).setRemaining(r.getRemaining())
-        .setStorageID(r.getStorageID()).build();
+        .setStorageID(r.getStorageID());
+    return builder.build();
   }
 
   public static JournalInfo convert(JournalInfoProto info) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfo.java Tue Oct 29 00:49:20 2013
@@ -85,7 +85,7 @@ public class BlockInfo extends Block imp
     this.bc = bc;
   }
 
-  DatanodeDescriptor getDatanode(int index) {
+  public DatanodeDescriptor getDatanode(int index) {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert index >= 0 && index*3 < triplets.length : "Index is out of bound";
     return (DatanodeDescriptor)triplets[index*3];
@@ -153,7 +153,7 @@ public class BlockInfo extends Block imp
     return info;
   }
 
-  int getCapacity() {
+  public int getCapacity() {
     assert this.triplets != null : "BlockInfo is not initialized";
     assert triplets.length % 3 == 0 : "Malformed BlockInfo";
     return triplets.length / 3;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java Tue Oct 29 00:49:20 2013
@@ -3138,6 +3138,13 @@ assert storedBlock.findDatanode(dn) < 0 
         UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS);
   }
 
+  /**
+   * Get the replicas which are corrupt for a given block.
+   */
+  public Collection<DatanodeDescriptor> getCorruptReplicas(Block block) {
+    return corruptReplicas.getNodes(block);
+  }
+
   /** @return the size of UnderReplicatedBlocks */
   public int numOfUnderReplicatedBlocks() {
     return neededReplications.size();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java Tue Oct 29 00:49:20 2013
@@ -22,14 +22,20 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Queue;
+
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.util.LightWeightHashSet;
+import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.Time;
 
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+
 /**
  * This class extends the DatanodeInfo class with ephemeral information (eg
  * health, capacity, what blocks are associated with the Datanode) that is
@@ -94,8 +100,74 @@ public class DatanodeDescriptor extends 
     }
   }
 
+  /**
+   * A list of CachedBlock objects on this datanode.
+   */
+  public static class CachedBlocksList extends IntrusiveCollection<CachedBlock> {
+    public enum Type {
+      PENDING_CACHED,
+      CACHED,
+      PENDING_UNCACHED
+    }
+
+    private final DatanodeDescriptor datanode;
+
+    private final Type type;
+
+    CachedBlocksList(DatanodeDescriptor datanode, Type type) {
+      this.datanode = datanode;
+      this.type = type;
+    }
+
+    public DatanodeDescriptor getDatanode() {
+      return datanode;
+    }
+
+    public Type getType() {
+      return type;
+    }
+  }
+
+  /**
+   * The blocks which we want to cache on this DataNode.
+   */
+  private final CachedBlocksList pendingCached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_CACHED);
+
+  /**
+   * The blocks which we know are cached on this datanode.
+   * This list is updated by periodic cache reports.
+   */
+  private final CachedBlocksList cached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.CACHED);
+
+  /**
+   * The blocks which we want to uncache on this DataNode.
+   */
+  private final CachedBlocksList pendingUncached = 
+      new CachedBlocksList(this, CachedBlocksList.Type.PENDING_UNCACHED);
+
+  public CachedBlocksList getPendingCached() {
+    return pendingCached;
+  }
+
+  public CachedBlocksList getCached() {
+    return cached;
+  }
+
+  public CachedBlocksList getPendingUncached() {
+    return pendingUncached;
+  }
+
+  /**
+   * Head of the list of blocks on the datanode
+   */
   private volatile BlockInfo blockList = null;
+  /**
+   * Number of blocks on the datanode
+   */
   private int numBlocks = 0;
+
   // isAlive == heartbeats.contains(this)
   // This is an optimization, because contains takes O(n) time on Arraylist
   public boolean isAlive = false;
@@ -160,7 +232,7 @@ public class DatanodeDescriptor extends 
    * @param nodeID id of the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID) {
-    this(nodeID, 0L, 0L, 0L, 0L, 0, 0);
+    this(nodeID, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
   }
 
   /**
@@ -170,7 +242,7 @@ public class DatanodeDescriptor extends 
    */
   public DatanodeDescriptor(DatanodeID nodeID, 
                             String networkLocation) {
-    this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0, 0);
+    this(nodeID, networkLocation, 0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
   }
   
   /**
@@ -180,6 +252,8 @@ public class DatanodeDescriptor extends 
    * @param dfsUsed space used by the data node
    * @param remaining remaining capacity of the data node
    * @param bpused space used by the block pool corresponding to this namenode
+   * @param cacheCapacity cache capacity of the data node
+   * @param cacheUsed cache used on the data node
    * @param xceiverCount # of data transfers at the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID, 
@@ -187,11 +261,13 @@ public class DatanodeDescriptor extends 
                             long dfsUsed,
                             long remaining,
                             long bpused,
+                            long cacheCapacity,
+                            long cacheUsed,
                             int xceiverCount,
                             int failedVolumes) {
     super(nodeID);
-    updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount, 
-        failedVolumes);
+    updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
+        cacheUsed, xceiverCount, failedVolumes);
   }
 
   /**
@@ -202,6 +278,8 @@ public class DatanodeDescriptor extends 
    * @param dfsUsed the used space by dfs datanode
    * @param remaining remaining capacity of the data node
    * @param bpused space used by the block pool corresponding to this namenode
+   * @param cacheCapacity cache capacity of the data node
+   * @param cacheUsed cache used on the data node
    * @param xceiverCount # of data transfers at the data node
    */
   public DatanodeDescriptor(DatanodeID nodeID,
@@ -210,11 +288,13 @@ public class DatanodeDescriptor extends 
                             long dfsUsed,
                             long remaining,
                             long bpused,
+                            long cacheCapacity,
+                            long cacheUsed,
                             int xceiverCount,
                             int failedVolumes) {
     super(nodeID, networkLocation);
-    updateHeartbeat(capacity, dfsUsed, remaining, bpused, xceiverCount, 
-        failedVolumes);
+    updateHeartbeat(capacity, dfsUsed, remaining, bpused, cacheCapacity,
+        cacheUsed, xceiverCount, failedVolumes);
   }
 
   /**
@@ -257,6 +337,7 @@ public class DatanodeDescriptor extends 
    * Used for testing only
    * @return the head of the blockList
    */
+  @VisibleForTesting
   protected BlockInfo getHead(){
     return blockList;
   }
@@ -285,6 +366,11 @@ public class DatanodeDescriptor extends 
     this.blockList = null;
     this.invalidateBlocks.clear();
     this.volumeFailures = 0;
+    // pendingCached, cached, and pendingUncached are protected by the
+    // FSN lock.
+    this.pendingCached.clear();
+    this.cached.clear();
+    this.pendingUncached.clear();
   }
   
   public void clearBlockQueues() {
@@ -293,6 +379,11 @@ public class DatanodeDescriptor extends 
       this.recoverBlocks.clear();
       this.replicateBlocks.clear();
     }
+    // pendingCached, cached, and pendingUncached are protected by the
+    // FSN lock.
+    this.pendingCached.clear();
+    this.cached.clear();
+    this.pendingUncached.clear();
   }
 
   public int numBlocks() {
@@ -303,11 +394,14 @@ public class DatanodeDescriptor extends 
    * Updates stats from datanode heartbeat.
    */
   public void updateHeartbeat(long capacity, long dfsUsed, long remaining,
-      long blockPoolUsed, int xceiverCount, int volFailures) {
+      long blockPoolUsed, long cacheCapacity, long cacheUsed, int xceiverCount,
+      int volFailures) {
     setCapacity(capacity);
     setRemaining(remaining);
     setBlockPoolUsed(blockPoolUsed);
     setDfsUsed(dfsUsed);
+    setCacheCapacity(cacheCapacity);
+    setCacheUsed(cacheUsed);
     setXceiverCount(xceiverCount);
     setLastUpdate(Time.now());    
     this.volumeFailures = volFailures;
@@ -348,7 +442,7 @@ public class DatanodeDescriptor extends 
   public Iterator<BlockInfo> getBlockIterator() {
     return new BlockIterator(this.blockList, this);
   }
-  
+
   /**
    * Store block replication work.
    */
@@ -380,7 +474,7 @@ public class DatanodeDescriptor extends 
       }
     }
   }
-
+  
   /**
    * The number of work items that are pending to be replicated
    */
@@ -397,7 +491,7 @@ public class DatanodeDescriptor extends 
       return invalidateBlocks.size();
     }
   }
-  
+
   public List<BlockTargetPair> getReplicationCommand(int maxTransfers) {
     return replicateBlocks.poll(maxTransfers);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java Tue Oct 29 00:49:20 2013
@@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList;
+import org.apache.hadoop.hdfs.server.namenode.CachedBlock;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.Entry;
 import org.apache.hadoop.hdfs.server.namenode.HostFileManager.EntrySet;
@@ -45,6 +47,7 @@ import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.*;
 import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
 import org.apache.hadoop.util.Daemon;
+import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.Time;
 
@@ -144,6 +147,12 @@ public class DatanodeManager {
 
   private final boolean checkIpHostnameInRegistration;
   /**
+   * Whether we should tell datanodes what to cache in replies to
+   * heartbeat messages.
+   */
+  private boolean sendCachingCommands = false;
+
+  /**
    * The number of datanodes for each software version. This list should change
    * during rolling upgrades.
    * Software version -> Number of datanodes with this version
@@ -1215,8 +1224,8 @@ public class DatanodeManager {
   public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
       final String blockPoolId,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      int xceiverCount, int maxTransfers, int failedVolumes
-      ) throws IOException {
+      long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers,
+      int failedVolumes) throws IOException {
     synchronized (heartbeatManager) {
       synchronized (datanodeMap) {
         DatanodeDescriptor nodeinfo = null;
@@ -1237,7 +1246,8 @@ public class DatanodeManager {
         }
 
         heartbeatManager.updateHeartbeat(nodeinfo, capacity, dfsUsed,
-            remaining, blockPoolUsed, xceiverCount, failedVolumes);
+            remaining, blockPoolUsed, cacheCapacity, cacheUsed, xceiverCount,
+            failedVolumes);
 
         // If we are in safemode, do not send back any recovery / replication
         // requests. Don't even drain the existing queue of work.
@@ -1298,7 +1308,19 @@ public class DatanodeManager {
           cmds.add(new BlockCommand(DatanodeProtocol.DNA_INVALIDATE,
               blockPoolId, blks));
         }
-        
+        DatanodeCommand pendingCacheCommand =
+            getCacheCommand(nodeinfo.getPendingCached(), nodeinfo,
+              DatanodeProtocol.DNA_CACHE, blockPoolId);
+        if (pendingCacheCommand != null) {
+          cmds.add(pendingCacheCommand);
+        }
+        DatanodeCommand pendingUncacheCommand =
+            getCacheCommand(nodeinfo.getPendingUncached(), nodeinfo,
+              DatanodeProtocol.DNA_UNCACHE, blockPoolId);
+        if (pendingUncacheCommand != null) {
+          cmds.add(pendingUncacheCommand);
+        }
+
         blockManager.addKeyUpdateCommand(cmds, nodeinfo);
 
         // check for balancer bandwidth update
@@ -1318,6 +1340,40 @@ public class DatanodeManager {
   }
 
   /**
+   * Convert a CachedBlockList into a DatanodeCommand with a list of blocks.
+   *
+   * @param list       The {@link CachedBlocksList}.  This function 
+   *                   clears the list.
+   * @param datanode   The datanode.
+   * @param action     The action to perform in the command.
+   * @param poolId     The block pool id.
+   * @return           A DatanodeCommand to be sent back to the DN, or null if
+   *                   there is nothing to be done.
+   */
+  private DatanodeCommand getCacheCommand(CachedBlocksList list,
+      DatanodeDescriptor datanode, int action, String poolId) {
+    int length = list.size();
+    if (length == 0) {
+      return null;
+    }
+    // Read and clear the existing cache commands.
+    long[] blockIds = new long[length];
+    int i = 0;
+    for (Iterator<CachedBlock> iter = list.iterator();
+            iter.hasNext(); ) {
+      CachedBlock cachedBlock = iter.next();
+      blockIds[i++] = cachedBlock.getBlockId();
+      iter.remove();
+    }
+    if (!sendCachingCommands) {
+      // Do not send caching commands unless the FSNamesystem told us we
+      // should.
+      return null;
+    }
+    return new BlockIdCommand(action, poolId, blockIds);
+  }
+
+  /**
    * Tell all datanodes to use a new, non-persistent bandwidth value for
    * dfs.balance.bandwidthPerSec.
    *
@@ -1365,4 +1421,8 @@ public class DatanodeManager {
   public String toString() {
     return getClass().getSimpleName() + ": " + host2DatanodeMap;
   }
+
+  public void setSendCachingCommands(boolean sendCachingCommands) {
+    this.sendCachingCommands = sendCachingCommands;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Tue Oct 29 00:49:20 2013
@@ -170,7 +170,7 @@ class HeartbeatManager implements Datano
       addDatanode(d);
 
       //update its timestamp
-      d.updateHeartbeat(0L, 0L, 0L, 0L, 0, 0);
+      d.updateHeartbeat(0L, 0L, 0L, 0L, 0L, 0L, 0, 0);
     }
   }
 
@@ -193,10 +193,10 @@ class HeartbeatManager implements Datano
 
   synchronized void updateHeartbeat(final DatanodeDescriptor node,
       long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-      int xceiverCount, int failedVolumes) {
+      long cacheCapacity, long cacheUsed, int xceiverCount, int failedVolumes) {
     stats.subtract(node);
     node.updateHeartbeat(capacity, dfsUsed, remaining, blockPoolUsed,
-        xceiverCount, failedVolumes);
+        cacheCapacity, cacheUsed, xceiverCount, failedVolumes);
     stats.add(node);
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPOfferService.java Tue Oct 29 00:49:20 2013
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.E
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.protocol.BalancerBandwidthCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
+import org.apache.hadoop.hdfs.server.protocol.BlockIdCommand;
 import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
 import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -518,6 +519,8 @@ class BPOfferService {
       return true;
     final BlockCommand bcmd = 
       cmd instanceof BlockCommand? (BlockCommand)cmd: null;
+    final BlockIdCommand blockIdCmd = 
+      cmd instanceof BlockIdCommand ? (BlockIdCommand)cmd: null;
 
     switch(cmd.getAction()) {
     case DatanodeProtocol.DNA_TRANSFER:
@@ -543,6 +546,16 @@ class BPOfferService {
       }
       dn.metrics.incrBlocksRemoved(toDelete.length);
       break;
+    case DatanodeProtocol.DNA_CACHE:
+      LOG.info("DatanodeCommand action: DNA_CACHE");
+      dn.getFSDataset().cache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dn.metrics.incrBlocksCached(blockIdCmd.getBlockIds().length);
+      break;
+    case DatanodeProtocol.DNA_UNCACHE:
+      LOG.info("DatanodeCommand action: DNA_UNCACHE");
+      dn.getFSDataset().uncache(blockIdCmd.getBlockPoolId(), blockIdCmd.getBlockIds());
+      dn.metrics.incrBlocksUncached(blockIdCmd.getBlockIds().length);
+      break;
     case DatanodeProtocol.DNA_SHUTDOWN:
       // TODO: DNA_SHUTDOWN appears to be unused - the NN never sends this command
       // See HDFS-2987.
@@ -615,6 +628,8 @@ class BPOfferService {
     case DatanodeProtocol.DNA_FINALIZE:
     case DatanodeProtocol.DNA_RECOVERBLOCK:
     case DatanodeProtocol.DNA_BALANCERBANDWIDTHUPDATE:
+    case DatanodeProtocol.DNA_CACHE:
+    case DatanodeProtocol.DNA_UNCACHE:
       LOG.warn("Got a command from standby NN - ignoring command:" + cmd.getAction());
       break;
     default:

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java Tue Oct 29 00:49:20 2013
@@ -24,6 +24,7 @@ import java.net.InetSocketAddress;
 import java.net.SocketTimeoutException;
 import java.net.URI;
 import java.util.Collection;
+import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.logging.Log;
@@ -84,6 +85,8 @@ class BPServiceActor implements Runnable
 
   boolean resetBlockReportTime = true;
 
+  volatile long lastCacheReport = 0;
+
   Thread bpThread;
   DatanodeProtocolClientSideTranslatorPB bpNamenode;
   private volatile long lastHeartbeat = 0;
@@ -430,6 +433,35 @@ class BPServiceActor implements Runnable
     return cmd;
   }
   
+  DatanodeCommand cacheReport() throws IOException {
+    // If caching is disabled, do not send a cache report
+    if (dn.getFSDataset().getDnCacheCapacity() == 0) {
+      return null;
+    }
+    // send cache report if timer has expired.
+    DatanodeCommand cmd = null;
+    long startTime = Time.monotonicNow();
+    if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Sending cacheReport from service actor: " + this);
+      }
+      lastCacheReport = startTime;
+
+      String bpid = bpos.getBlockPoolId();
+      List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
+      long createTime = Time.monotonicNow();
+
+      cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
+      long sendTime = Time.monotonicNow();
+      long createCost = createTime - startTime;
+      long sendCost = sendTime - createTime;
+      dn.getMetrics().addCacheReport(sendCost);
+      LOG.info("CacheReport of " + blockIds.size()
+          + " blocks took " + createCost + " msec to generate and "
+          + sendCost + " msecs for RPC and NN processing");
+    }
+    return cmd;
+  }
   
   HeartbeatResponse sendHeartBeat() throws IOException {
     if (LOG.isDebugEnabled()) {
@@ -443,6 +475,8 @@ class BPServiceActor implements Runnable
         dn.getFSDataset().getRemaining(),
         dn.getFSDataset().getBlockPoolUsed(bpos.getBlockPoolId())) };
     return bpNamenode.sendHeartbeat(bpRegistration, report,
+        dn.getFSDataset().getDnCacheCapacity(),
+        dn.getFSDataset().getDnCacheUsed(),
         dn.getXmitsInProgress(),
         dn.getXceiverCount(),
         dn.getFSDataset().getNumFailedVolumes());
@@ -496,11 +530,12 @@ class BPServiceActor implements Runnable
    * forever calling remote NameNode functions.
    */
   private void offerService() throws Exception {
-    LOG.info("For namenode " + nnAddr + " using DELETEREPORT_INTERVAL of "
-        + dnConf.deleteReportInterval + " msec " + " BLOCKREPORT_INTERVAL of "
-        + dnConf.blockReportInterval + "msec" + " Initial delay: "
-        + dnConf.initialBlockReportDelay + "msec" + "; heartBeatInterval="
-        + dnConf.heartBeatInterval);
+    LOG.info("For namenode " + nnAddr + " using"
+        + " DELETEREPORT_INTERVAL of " + dnConf.deleteReportInterval + " msec "
+        + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+        + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+        + " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+        + "; heartBeatInterval=" + dnConf.heartBeatInterval);
 
     //
     // Now loop for a long time....
@@ -555,6 +590,9 @@ class BPServiceActor implements Runnable
         DatanodeCommand cmd = blockReport();
         processCommand(new DatanodeCommand[]{ cmd });
 
+        cmd = cacheReport();
+        processCommand(new DatanodeCommand[]{ cmd });
+
         // Now safe to start scanning the block pool.
         // If it has already been started, this is a no-op.
         if (dn.blockScanner != null) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java Tue Oct 29 00:49:20 2013
@@ -18,13 +18,18 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INITIAL_DELAY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCKREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SYNCONCLOSE_KEY;
@@ -39,6 +44,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
@@ -66,6 +72,7 @@ public class DNConf {
   final long blockReportInterval;
   final long deleteReportInterval;
   final long initialBlockReportDelay;
+  final long cacheReportInterval;
   final int writePacketSize;
   
   final String minimumNameNodeVersion;
@@ -73,6 +80,8 @@ public class DNConf {
   
   final long xceiverStopTimeout;
 
+  final long maxLockedMemory;
+
   public DNConf(Configuration conf) {
     socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
         HdfsServerConstants.READ_TIMEOUT);
@@ -107,7 +116,9 @@ public class DNConf {
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
         DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
     this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
-    DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+        DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);
+    this.cacheReportInterval = conf.getLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY,
+        DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT);
     
     long initBRDelay = conf.getLong(
         DFS_BLOCKREPORT_INITIAL_DELAY_KEY,
@@ -137,6 +148,10 @@ public class DNConf {
     this.xceiverStopTimeout = conf.getLong(
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY,
         DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_DEFAULT);
+
+    this.maxLockedMemory = conf.getLong(
+        DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+        DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
   }
   
   // We get minimumNameNodeVersion via a method so it can be mocked out in tests.
@@ -147,4 +162,8 @@ public class DNConf {
   public long getXceiverStopTimeout() {
     return xceiverStopTimeout;
   }
+
+  public long getMaxLockedMemory() {
+    return maxLockedMemory;
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Tue Oct 29 00:49:20 2013
@@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.ReadaheadPool;
+import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.ipc.ProtobufRpcEngine;
 import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
@@ -656,6 +657,25 @@ public class DataNode extends Configured
     this.conf = conf;
     this.dnConf = new DNConf(conf);
 
+    if (dnConf.maxLockedMemory > 0) {
+      if (!NativeIO.isAvailable()) {
+        throw new RuntimeException(String.format(
+            "Cannot start datanode because the configured max locked memory" +
+            " size (%s) is greater than zero and native code is not available.",
+            DFS_DATANODE_MAX_LOCKED_MEMORY_KEY));
+      }
+      long ulimit = NativeIO.getMemlockLimit();
+      if (dnConf.maxLockedMemory > ulimit) {
+      throw new RuntimeException(String.format(
+          "Cannot start datanode because the configured max locked memory" +
+          " size (%s) of %d bytes is more than the datanode's available" +
+          " RLIMIT_MEMLOCK ulimit of %d bytes.",
+          DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          dnConf.maxLockedMemory,
+          ulimit));
+      }
+    }
+
     storage = new DataStorage();
     
     // global DN settings

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsDatasetSpi.java Tue Oct 29 00:49:20 2013
@@ -269,6 +269,14 @@ public interface FsDatasetSpi<V extends 
    */
   public BlockListAsLongs getBlockReport(String bpid);
 
+  /**
+   * Returns the cache report - the full list of cached block IDs of a
+   * block pool.
+   * @param   bpid Block Pool Id
+   * @return  the cache report - the full list of cached block IDs.
+   */
+  public List<Long> getCacheReport(String bpid);
+
   /** Does the dataset contain the block? */
   public boolean contains(ExtendedBlock block);
 
@@ -294,6 +302,20 @@ public interface FsDatasetSpi<V extends 
    */
   public void invalidate(String bpid, Block invalidBlks[]) throws IOException;
 
+  /**
+   * Caches the specified blocks
+   * @param bpid Block pool id
+   * @param blockIds - block ids to cache
+   */
+  public void cache(String bpid, long[] blockIds);
+
+  /**
+   * Uncaches the specified blocks
+   * @param bpid Block pool id
+   * @param blockIds - blocks ids to uncache
+   */
+  public void uncache(String bpid, long[] blockIds);
+
     /**
      * Check if all the data directories are healthy
      * @throws DiskErrorException

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Tue Oct 29 00:49:20 2013
@@ -37,6 +37,7 @@ import javax.management.NotCompliantMBea
 import javax.management.ObjectName;
 import javax.management.StandardMBean;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -170,6 +171,7 @@ class FsDatasetImpl implements FsDataset
   final FsVolumeList volumes;
   final ReplicaMap volumeMap;
   final FsDatasetAsyncDiskService asyncDiskService;
+  final FsDatasetCache cacheManager;
   private final int validVolsRequired;
 
   // Used for synchronizing access to usage stats
@@ -228,6 +230,7 @@ class FsDatasetImpl implements FsDataset
       roots[idx] = storage.getStorageDir(idx).getCurrentDir();
     }
     asyncDiskService = new FsDatasetAsyncDiskService(datanode, roots);
+    cacheManager = new FsDatasetCache(this);
     registerMBean(storage.getStorageID());
   }
 
@@ -288,6 +291,22 @@ class FsDatasetImpl implements FsDataset
   }
 
   /**
+   * Returns the total cache used by the datanode (in bytes).
+   */
+  @Override // FSDatasetMBean
+  public long getDnCacheUsed() {
+    return cacheManager.getDnCacheUsed();
+  }
+
+  /**
+   * Returns the total cache capacity of the datanode (in bytes).
+   */
+  @Override // FSDatasetMBean
+  public long getDnCacheCapacity() {
+    return cacheManager.getDnCacheCapacity();
+  }
+
+  /**
    * Find the block's on-disk length
    */
   @Override // FsDatasetSpi
@@ -534,6 +553,8 @@ class FsDatasetImpl implements FsDataset
   private synchronized ReplicaBeingWritten append(String bpid,
       FinalizedReplica replicaInfo, long newGS, long estimateBlockLen)
       throws IOException {
+    // uncache the block
+    cacheManager.uncacheBlock(bpid, replicaInfo.getBlockId());
     // unlink the finalized replica
     replicaInfo.unlinkBlock(1);
     
@@ -1001,6 +1022,11 @@ class FsDatasetImpl implements FsDataset
     }
   }
 
+  @Override // FsDatasetSpi
+  public List<Long> getCacheReport(String bpid) {
+    return cacheManager.getCachedBlocks(bpid);
+  }
+
   /**
    * Get the list of finalized blocks from in-memory blockmap for a block pool.
    */
@@ -1143,6 +1169,8 @@ class FsDatasetImpl implements FsDataset
         volumeMap.remove(bpid, invalidBlks[i]);
       }
 
+      // Uncache the block synchronously
+      cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId());
       // Delete the block asynchronously to make sure we can do it fast enough
       asyncDiskService.deleteAsync(v, f,
           FsDatasetUtil.getMetaFile(f, invalidBlks[i].getGenerationStamp()),
@@ -1153,6 +1181,82 @@ class FsDatasetImpl implements FsDataset
     }
   }
 
+  synchronized boolean validToCache(String bpid, long blockId) {
+    ReplicaInfo info = volumeMap.get(bpid, blockId);
+    if (info == null) {
+      LOG.warn("Failed to cache replica in block pool " + bpid +
+          " with block id " + blockId + ": ReplicaInfo not found.");
+      return false;
+    }
+    FsVolumeImpl volume = (FsVolumeImpl)info.getVolume();
+    if (volume == null) {
+      LOG.warn("Failed to cache block with id " + blockId +
+          ": Volume not found.");
+      return false;
+    }
+    if (info.getState() != ReplicaState.FINALIZED) {
+      LOG.warn("Failed to block with id " + blockId + 
+          ": Replica is not finalized.");
+      return false;
+    }
+    return true;
+  }
+
+  /**
+   * Asynchronously attempts to cache a single block via {@link FsDatasetCache}.
+   */
+  private void cacheBlock(String bpid, long blockId) {
+    ReplicaInfo info;
+    FsVolumeImpl volume;
+    synchronized (this) {
+      if (!validToCache(bpid, blockId)) {
+        return;
+      }
+      info = volumeMap.get(bpid, blockId);
+      volume = (FsVolumeImpl)info.getVolume();
+    }
+    // Try to open block and meta streams
+    FileInputStream blockIn = null;
+    FileInputStream metaIn = null;
+    boolean success = false;
+    ExtendedBlock extBlk =
+        new ExtendedBlock(bpid, blockId,
+            info.getBytesOnDisk(), info.getGenerationStamp());
+    try {
+      blockIn = (FileInputStream)getBlockInputStream(extBlk, 0);
+      metaIn = (FileInputStream)getMetaDataInputStream(extBlk)
+          .getWrappedStream();
+      success = true;
+    } catch (ClassCastException e) {
+      LOG.warn("Failed to cache replica " + extBlk + ": Underlying blocks"
+          + " are not backed by files.", e);
+    } catch (IOException e) {
+      LOG.warn("Failed to cache replica " + extBlk + ": IOException while"
+          + " trying to open block or meta files.", e);
+    }
+    if (!success) {
+      IOUtils.closeQuietly(blockIn);
+      IOUtils.closeQuietly(metaIn);
+      return;
+    }
+    cacheManager.cacheBlock(bpid, extBlk.getLocalBlock(),
+        volume, blockIn, metaIn);
+  }
+
+  @Override // FsDatasetSpi
+  public void cache(String bpid, long[] blockIds) {
+    for (int i=0; i < blockIds.length; i++) {
+      cacheBlock(bpid, blockIds[i]);
+    }
+  }
+
+  @Override // FsDatasetSpi
+  public void uncache(String bpid, long[] blockIds) {
+    for (int i=0; i < blockIds.length; i++) {
+      cacheManager.uncacheBlock(bpid, blockIds[i]);
+    }
+  }
+
   @Override // FsDatasetSpi
   public synchronized boolean contains(final ExtendedBlock block) {
     final long blockId = block.getLocalBlock().getBlockId();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeImpl.java Tue Oct 29 00:49:20 2013
@@ -18,11 +18,17 @@
 package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
 
 import java.io.File;
+import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -34,6 +40,8 @@ import org.apache.hadoop.hdfs.server.dat
 import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
 
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
 /**
  * The underlying volume used to store replica.
  * 
@@ -48,6 +56,13 @@ class FsVolumeImpl implements FsVolumeSp
   private final File currentDir;    // <StorageDirectory>/current
   private final DF usage;           
   private final long reserved;
+  /**
+   * Per-volume worker pool that processes new blocks to cache.
+   * The maximum number of workers per volume is bounded (configurable via
+   * dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
+   * contention.
+   */
+  private final ThreadPoolExecutor cacheExecutor;
   
   FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
       Configuration conf) throws IOException {
@@ -59,6 +74,20 @@ class FsVolumeImpl implements FsVolumeSp
     this.currentDir = currentDir; 
     File parent = currentDir.getParentFile();
     this.usage = new DF(parent, conf);
+    final int maxNumThreads = dataset.datanode.getConf().getInt(
+        DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
+        DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT
+        );
+    ThreadFactory workerFactory = new ThreadFactoryBuilder()
+        .setDaemon(true)
+        .setNameFormat("FsVolumeImplWorker-" + parent.toString() + "-%d")
+        .build();
+    cacheExecutor = new ThreadPoolExecutor(
+        1, maxNumThreads,
+        60, TimeUnit.SECONDS,
+        new LinkedBlockingQueue<Runnable>(),
+        workerFactory);
+    cacheExecutor.allowCoreThreadTimeOut(true);
   }
   
   File getCurrentDir() {
@@ -166,7 +195,11 @@ class FsVolumeImpl implements FsVolumeSp
   File addBlock(String bpid, Block b, File f) throws IOException {
     return getBlockPoolSlice(bpid).addBlock(b, f);
   }
-    
+
+  Executor getExecutor() {
+    return cacheExecutor;
+  }
+
   void checkDirs() throws DiskErrorException {
     // TODO:FEDERATION valid synchronization
     for(BlockPoolSlice s : bpSlices.values()) {
@@ -210,6 +243,7 @@ class FsVolumeImpl implements FsVolumeSp
   }
 
   void shutdown() {
+    cacheExecutor.shutdown();
     Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
     for (Entry<String, BlockPoolSlice> entry : set) {
       entry.getValue().shutdown();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java Tue Oct 29 00:49:20 2013
@@ -57,6 +57,8 @@ public class DataNodeMetrics {
   @Metric MutableCounterLong blocksRemoved;
   @Metric MutableCounterLong blocksVerified;
   @Metric MutableCounterLong blockVerificationFailures;
+  @Metric MutableCounterLong blocksCached;
+  @Metric MutableCounterLong blocksUncached;
   @Metric MutableCounterLong readsFromLocalClient;
   @Metric MutableCounterLong readsFromRemoteClient;
   @Metric MutableCounterLong writesFromLocalClient;
@@ -74,6 +76,7 @@ public class DataNodeMetrics {
   @Metric MutableRate replaceBlockOp;
   @Metric MutableRate heartbeats;
   @Metric MutableRate blockReports;
+  @Metric MutableRate cacheReports;
   @Metric MutableRate packetAckRoundTripTimeNanos;
   MutableQuantiles[] packetAckRoundTripTimeNanosQuantiles;
   
@@ -151,6 +154,10 @@ public class DataNodeMetrics {
     blockReports.add(latency);
   }
 
+  public void addCacheReport(long latency) {
+    cacheReports.add(latency);
+  }
+
   public void incrBlocksReplicated(int delta) {
     blocksReplicated.incr(delta);
   }
@@ -175,6 +182,15 @@ public class DataNodeMetrics {
     blocksVerified.incr();
   }
 
+
+  public void incrBlocksCached(int delta) {
+    blocksCached.incr(delta);
+  }
+
+  public void incrBlocksUncached(int delta) {
+    blocksUncached.incr(delta);
+  }
+
   public void addReadBlockOp(long latency) {
     readBlockOp.add(latency);
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Tue Oct 29 00:49:20 2013
@@ -77,4 +77,14 @@ public interface FSDatasetMBean {
    * @return The number of failed volumes in the datanode.
    */
   public int getNumFailedVolumes();
+
+  /**
+   * Returns the total cache used by the datanode (in bytes).
+   */
+  public long getDnCacheUsed();
+
+  /**
+   * Returns the total cache capacity of the datanode (in bytes).
+   */
+  public long getDnCacheCapacity();
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Tue Oct 29 00:49:20 2013
@@ -53,6 +53,7 @@ import org.apache.hadoop.hdfs.protocol.F
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.SnapshotAccessControlException;
@@ -2592,12 +2593,21 @@ public class FSDirectory implements Clos
     int childrenNum = node.isDirectory() ? 
         node.asDirectory().getChildrenNum(snapshot) : 0;
         
-    return new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
-        blocksize, node.getModificationTime(snapshot),
-        node.getAccessTime(snapshot), node.getFsPermission(snapshot),
-        node.getUserName(snapshot), node.getGroupName(snapshot),
-        node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
-        node.getId(), loc, childrenNum);
+    HdfsLocatedFileStatus status =
+        new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
+          blocksize, node.getModificationTime(snapshot),
+          node.getAccessTime(snapshot), node.getFsPermission(snapshot),
+          node.getUserName(snapshot), node.getGroupName(snapshot),
+          node.isSymlink() ? node.asSymlink().getSymlink() : null, path,
+          node.getId(), loc, childrenNum);
+    // Set caching information for the located blocks.
+    if (loc != null) {
+      CacheManager cacheManager = namesystem.getCacheManager();
+      for (LocatedBlock lb: loc.getLocatedBlocks()) {
+        cacheManager.setCachedLocations(lb);
+      }
+    }
+    return status;
   }
 
     

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Tue Oct 29 00:49:20 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.IOException;
@@ -35,15 +36,18 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
@@ -55,12 +59,17 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogSegmentOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
@@ -69,9 +78,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -948,6 +954,45 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
+  void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
+      boolean toLogRpcIds) {
+    AddPathBasedCacheDirectiveOp op = AddPathBasedCacheDirectiveOp.getInstance(
+        cache.get())
+        .setPath(directive.getPath().toUri().getPath())
+        .setReplication(directive.getReplication())
+        .setPool(directive.getPool());
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logRemovePathBasedCacheDescriptor(Long id, boolean toLogRpcIds) {
+    RemovePathBasedCacheDescriptorOp op =
+        RemovePathBasedCacheDescriptorOp.getInstance(cache.get()).setId(id);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logAddCachePool(CachePoolInfo pool, boolean toLogRpcIds) {
+    AddCachePoolOp op =
+        AddCachePoolOp.getInstance(cache.get()).setPool(pool);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logModifyCachePool(CachePoolInfo info, boolean toLogRpcIds) {
+    ModifyCachePoolOp op =
+        ModifyCachePoolOp.getInstance(cache.get()).setInfo(info);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logRemoveCachePool(String poolName, boolean toLogRpcIds) {
+    RemoveCachePoolOp op =
+        RemoveCachePoolOp.getInstance(cache.get()).setPoolName(poolName);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
   /**
    * Get all the journals this edit log is currently operating on.
    */

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1536572&r1=1536571&r2=1536572&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Tue Oct 29 00:49:20 2013
@@ -30,16 +30,21 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
@@ -52,7 +57,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -76,6 +84,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.jasper.tagplugins.jstl.core.Remove;
 
 import com.google.common.base.Joiner;
 
@@ -631,6 +640,54 @@ public class FSEditLogLoader {
       fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
       break;
     }
+    case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
+      AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
+      PathBasedCacheDirective d = new PathBasedCacheDirective.Builder().
+          setPath(new Path(addOp.path)).
+          setReplication(addOp.replication).
+          setPool(addOp.pool).
+          build();
+      PathBasedCacheDescriptor descriptor =
+          fsNamesys.getCacheManager().addDirective(d, null);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
+            descriptor);
+      }
+      break;
+    }
+    case OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR: {
+      RemovePathBasedCacheDescriptorOp removeOp =
+          (RemovePathBasedCacheDescriptorOp) op;
+      fsNamesys.getCacheManager().removeDescriptor(removeOp.id, null);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
+    case OP_ADD_CACHE_POOL: {
+      AddCachePoolOp addOp = (AddCachePoolOp) op;
+      fsNamesys.getCacheManager().addCachePool(addOp.info);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
+    case OP_MODIFY_CACHE_POOL: {
+      ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op;
+      fsNamesys.getCacheManager().modifyCachePool(modifyOp.info);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
+    case OP_REMOVE_CACHE_POOL: {
+      RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op;
+      fsNamesys.getCacheManager().removeCachePool(removeOp.poolName);
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
     default:
       throw new IOException("Invalid operation read " + op.opCode);
     }