You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/29 22:04:35 UTC

svn commit: r1536889 [3/4] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./ dev-support/ src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/...

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Oct 29 21:04:31 2013
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@@ -32,7 +34,10 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -56,6 +61,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.List;
@@ -73,6 +79,7 @@ import org.apache.hadoop.fs.permission.P
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -97,7 +104,9 @@ import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.AttributesImpl;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 /**
  * Helper classes for reading the ops from an InputStream.
@@ -153,6 +162,13 @@ public abstract class FSEditLogOp {
       inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
       inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
       inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
+      inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
+          new AddPathBasedCacheDirectiveOp());
+      inst.put(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR,
+          new RemovePathBasedCacheDescriptorOp());
+      inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
+      inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
+      inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
     }
     
     public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -528,8 +544,7 @@ public abstract class FSEditLogOp {
       } else {
         this.blocks = new Block[0];
       }
-      this.permissions =
-          permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      this.permissions = permissionStatusFromXml(st);
       readRpcIdsFromXml(st);
     }
   }
@@ -1208,8 +1223,7 @@ public abstract class FSEditLogOp {
       this.inodeId = Long.valueOf(st.getValue("INODEID"));
       this.path = st.getValue("PATH");
       this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
-      this.permissions =
-          permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      this.permissions = permissionStatusFromXml(st);
     }
   }
 
@@ -1940,8 +1954,7 @@ public abstract class FSEditLogOp {
       this.value = st.getValue("VALUE");
       this.mtime = Long.valueOf(st.getValue("MTIME"));
       this.atime = Long.valueOf(st.getValue("ATIME"));
-      this.permissionStatus =
-          permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      this.permissionStatus = permissionStatusFromXml(st);
       
       readRpcIdsFromXml(st);
     }
@@ -2848,6 +2861,317 @@ public abstract class FSEditLogOp {
     }
   }
 
+  /**
+   * {@literal @AtMostOnce} for
+   * {@link ClientProtocol#addPathBasedCacheDirective}
+   */
+  static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
+    String path;
+    short replication;
+    String pool;
+
+    public AddPathBasedCacheDirectiveOp() {
+      super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+    }
+
+    static AddPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
+      return (AddPathBasedCacheDirectiveOp) cache
+          .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+    }
+
+    public AddPathBasedCacheDirectiveOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    public AddPathBasedCacheDirectiveOp setReplication(short replication) {
+      this.replication = replication;
+      return this;
+    }
+
+    public AddPathBasedCacheDirectiveOp setPool(String pool) {
+      this.pool = pool;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      this.path = FSImageSerialization.readString(in);
+      this.replication = FSImageSerialization.readShort(in);
+      this.pool = FSImageSerialization.readString(in);
+      readRpcIds(in, logVersion);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeShort(replication, out);
+      FSImageSerialization.writeString(pool, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "PATH", path);
+      XMLUtils.addSaxString(contentHandler, "REPLICATION",
+          Short.toString(replication));
+      XMLUtils.addSaxString(contentHandler, "POOL", pool);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      path = st.getValue("PATH");
+      replication = Short.parseShort(st.getValue("REPLICATION"));
+      pool = st.getValue("POOL");
+      readRpcIdsFromXml(st);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("AddPathBasedCacheDirective [");
+      builder.append("path=" + path + ",");
+      builder.append("replication=" + replication + ",");
+      builder.append("pool=" + pool);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /**
+   * {@literal @AtMostOnce} for
+   * {@link ClientProtocol#removePathBasedCacheDescriptor}
+   */
+  static class RemovePathBasedCacheDescriptorOp extends FSEditLogOp {
+    long id;
+
+    public RemovePathBasedCacheDescriptorOp() {
+      super(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
+    }
+
+    static RemovePathBasedCacheDescriptorOp getInstance(OpInstanceCache cache) {
+      return (RemovePathBasedCacheDescriptorOp) cache
+          .get(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
+    }
+
+    public RemovePathBasedCacheDescriptorOp setId(long id) {
+      this.id = id;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      this.id = FSImageSerialization.readLong(in);
+      readRpcIds(in, logVersion);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeLong(id, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "ID", Long.toString(id));
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.id = Long.parseLong(st.getValue("ID"));
+      readRpcIdsFromXml(st);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("RemovePathBasedCacheDescriptor [");
+      builder.append("id=" + Long.toString(id));
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#addCachePool} */
+  static class AddCachePoolOp extends FSEditLogOp {
+    CachePoolInfo info;
+
+    public AddCachePoolOp() {
+      super(OP_ADD_CACHE_POOL);
+    }
+
+    static AddCachePoolOp getInstance(OpInstanceCache cache) {
+      return (AddCachePoolOp) cache.get(OP_ADD_CACHE_POOL);
+    }
+
+    public AddCachePoolOp setPool(CachePoolInfo info) {
+      this.info = info;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      info = CachePoolInfo.readFrom(in);
+      readRpcIds(in, logVersion);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      info .writeTo(out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      info.writeXmlTo(contentHandler);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.info = CachePoolInfo.readXmlFrom(st);
+      readRpcIdsFromXml(st);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("AddCachePoolOp [");
+      builder.append("poolName=" + info.getPoolName() + ",");
+      builder.append("ownerName=" + info.getOwnerName() + ",");
+      builder.append("groupName=" + info.getGroupName() + ",");
+      builder.append("mode=" + Short.toString(info.getMode().toShort()) + ",");
+      builder.append("weight=" + Integer.toString(info.getWeight()));
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#modifyCachePool} */
+  static class ModifyCachePoolOp extends FSEditLogOp {
+    CachePoolInfo info;
+
+    public ModifyCachePoolOp() {
+      super(OP_MODIFY_CACHE_POOL);
+    }
+
+    static ModifyCachePoolOp getInstance(OpInstanceCache cache) {
+      return (ModifyCachePoolOp) cache.get(OP_MODIFY_CACHE_POOL);
+    }
+
+    public ModifyCachePoolOp setInfo(CachePoolInfo info) {
+      this.info = info;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      info = CachePoolInfo.readFrom(in);
+      readRpcIds(in, logVersion);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      info.writeTo(out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      cachePoolInfoToXml(contentHandler, info);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.info = cachePoolInfoFromXml(st);
+      readRpcIdsFromXml(st);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("ModifyCachePoolOp [");
+      ArrayList<String> fields = new ArrayList<String>(5);
+      if (info.getPoolName() != null) {
+        fields.add("poolName=" + info.getPoolName());
+      }
+      if (info.getOwnerName() != null) {
+        fields.add("ownerName=" + info.getOwnerName());
+      }
+      if (info.getGroupName() != null) {
+        fields.add("groupName=" + info.getGroupName());
+      }
+      if (info.getMode() != null) {
+        fields.add("mode=" + info.getMode().toString());
+      }
+      if (info.getWeight() != null) {
+        fields.add("weight=" + info.getWeight());
+      }
+      builder.append(Joiner.on(",").join(fields));
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  /** {@literal @AtMostOnce} for {@link ClientProtocol#removeCachePool} */
+  static class RemoveCachePoolOp extends FSEditLogOp {
+    String poolName;
+
+    public RemoveCachePoolOp() {
+      super(OP_REMOVE_CACHE_POOL);
+    }
+
+    static RemoveCachePoolOp getInstance(OpInstanceCache cache) {
+      return (RemoveCachePoolOp) cache.get(OP_REMOVE_CACHE_POOL);
+    }
+
+    public RemoveCachePoolOp setPoolName(String poolName) {
+      this.poolName = poolName;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      poolName = FSImageSerialization.readString(in);
+      readRpcIds(in, logVersion);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(poolName, out);
+      writeRpcIds(rpcClientId, rpcCallId, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
+      appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.poolName = st.getValue("POOLNAME");
+      readRpcIdsFromXml(st);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("RemoveCachePoolOp [");
+      builder.append("poolName=" + poolName);
+      appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
   static private short readShort(DataInputStream in) throws IOException {
     return Short.parseShort(FSImageSerialization.readString(in));
   }
@@ -3235,16 +3559,65 @@ public abstract class FSEditLogOp {
     contentHandler.startElement("", "", "PERMISSION_STATUS", new AttributesImpl());
     XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
     XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
-    XMLUtils.addSaxString(contentHandler, "MODE",
-        Short.valueOf(perm.getPermission().toShort()).toString());
+    fsPermissionToXml(contentHandler, perm.getPermission());
     contentHandler.endElement("", "", "PERMISSION_STATUS");
   }
 
   public static PermissionStatus permissionStatusFromXml(Stanza st)
       throws InvalidXmlException {
-    String username = st.getValue("USERNAME");
-    String groupname = st.getValue("GROUPNAME");
+    Stanza status = st.getChildren("PERMISSION_STATUS").get(0);
+    String username = status.getValue("USERNAME");
+    String groupname = status.getValue("GROUPNAME");
+    FsPermission mode = fsPermissionFromXml(status);
+    return new PermissionStatus(username, groupname, mode);
+  }
+
+  public static void fsPermissionToXml(ContentHandler contentHandler,
+      FsPermission mode) throws SAXException {
+    XMLUtils.addSaxString(contentHandler, "MODE", Short.valueOf(mode.toShort())
+        .toString());
+  }
+
+  public static FsPermission fsPermissionFromXml(Stanza st)
+      throws InvalidXmlException {
     short mode = Short.valueOf(st.getValue("MODE"));
-    return new PermissionStatus(username, groupname, new FsPermission(mode));
+    return new FsPermission(mode);
+  }
+
+  public static void cachePoolInfoToXml(ContentHandler contentHandler,
+      CachePoolInfo info) throws SAXException {
+    XMLUtils.addSaxString(contentHandler, "POOLNAME", info.getPoolName());
+    if (info.getOwnerName() != null) {
+      XMLUtils.addSaxString(contentHandler, "OWNERNAME", info.getOwnerName());
+    }
+    if (info.getGroupName() != null) {
+      XMLUtils.addSaxString(contentHandler, "GROUPNAME", info.getGroupName());
+    }
+    if (info.getMode() != null) {
+      fsPermissionToXml(contentHandler, info.getMode());
+    }
+    if (info.getWeight() != null) {
+      XMLUtils.addSaxString(contentHandler, "WEIGHT",
+          Integer.toString(info.getWeight()));
+    }
+  }
+
+  public static CachePoolInfo cachePoolInfoFromXml(Stanza st)
+      throws InvalidXmlException {
+    String poolName = st.getValue("POOLNAME");
+    CachePoolInfo info = new CachePoolInfo(poolName);
+    if (st.hasChildren("OWNERNAME")) {
+      info.setOwnerName(st.getValue("OWNERNAME"));
+    }
+    if (st.hasChildren("GROUPNAME")) {
+      info.setGroupName(st.getValue("GROUPNAME"));
+    }
+    if (st.hasChildren("MODE")) {
+      info.setMode(FSEditLogOp.fsPermissionFromXml(st));
+    }
+    if (st.hasChildren("WEIGHT")) {
+      info.setWeight(Integer.parseInt(st.getValue("WEIGHT")));
+    }
+    return info;
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Tue Oct 29 21:04:31 2013
@@ -63,7 +63,13 @@ public enum FSEditLogOpCodes {
   OP_ALLOW_SNAPSHOT             ((byte) 29),
   OP_DISALLOW_SNAPSHOT          ((byte) 30),
   OP_SET_GENSTAMP_V2            ((byte) 31),
-  OP_ALLOCATE_BLOCK_ID          ((byte) 32);
+  OP_ALLOCATE_BLOCK_ID          ((byte) 32),
+  OP_ADD_PATH_BASED_CACHE_DIRECTIVE        ((byte) 33),
+  OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR    ((byte) 34),
+  OP_ADD_CACHE_POOL             ((byte) 35),
+  OP_MODIFY_CACHE_POOL          ((byte) 36),
+  OP_REMOVE_CACHE_POOL          ((byte) 37);
+
   private byte opCode;
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Oct 29 21:04:31 2013
@@ -351,6 +351,8 @@ public class FSImageFormat {
 
         loadSecretManagerState(in);
 
+        loadCacheManagerState(in);
+
         // make sure to read to the end of file
         boolean eof = (in.read() == -1);
         assert eof : "Should have reached the end of image file " + curFile;
@@ -843,6 +845,14 @@ public class FSImageFormat {
       namesystem.loadSecretManagerState(in);
     }
 
+    private void loadCacheManagerState(DataInput in) throws IOException {
+      int imgVersion = getLayoutVersion();
+      if (!LayoutVersion.supports(Feature.CACHING, imgVersion)) {
+        return;
+      }
+      namesystem.getCacheManager().loadState(in);
+    }
+
     private int getLayoutVersion() {
       return namesystem.getFSImage().getStorage().getLayoutVersion();
     }
@@ -985,6 +995,8 @@ public class FSImageFormat {
         context.checkCancelled();
         sourceNamesystem.saveSecretManagerState(out, sdPath);
         context.checkCancelled();
+        sourceNamesystem.getCacheManager().saveState(out, sdPath);
+        context.checkCancelled();
         out.flush();
         context.checkCancelled();
         fout.getChannel().force(true);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Oct 29 21:04:31 2013
@@ -121,6 +121,7 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
 import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -145,6 +146,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
 import org.apache.hadoop.hdfs.StorageType;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -156,6 +159,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -374,6 +378,7 @@ public class FSNamesystem implements Nam
   FSDirectory dir;
   private final BlockManager blockManager;
   private final SnapshotManager snapshotManager;
+  private final CacheManager cacheManager;
   private final DatanodeStatistics datanodeStatistics;
 
   // Block pool ID used by this namenode
@@ -709,6 +714,12 @@ public class FSNamesystem implements Nam
       this.dtSecretManager = createDelegationTokenSecretManager(conf);
       this.dir = new FSDirectory(fsImage, this, conf);
       this.snapshotManager = new SnapshotManager(dir);
+      writeLock();
+      try {
+        this.cacheManager = new CacheManager(this, conf, blockManager);
+      } finally {
+        writeUnlock();
+      }
       this.safeMode = new SafeModeInfo(conf);
       this.auditLoggers = initAuditLoggers(conf);
       this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -901,6 +912,7 @@ public class FSNamesystem implements Nam
     writeLock();
     try {
       if (blockManager != null) blockManager.close();
+      cacheManager.deactivate();
     } finally {
       writeUnlock();
     }
@@ -932,7 +944,7 @@ public class FSNamesystem implements Nam
         blockManager.getDatanodeManager().markAllDatanodesStale();
         blockManager.clearQueues();
         blockManager.processAllPendingDNMessages();
-        
+
         if (!isInSafeMode() ||
             (isInSafeMode() && safeMode.isPopulatingReplQueues())) {
           LOG.info("Reprocessing replication and invalidation queues");
@@ -965,6 +977,8 @@ public class FSNamesystem implements Nam
       //ResourceMonitor required only at ActiveNN. See HDFS-2914
       this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
       nnrmthread.start();
+      cacheManager.activate();
+      blockManager.getDatanodeManager().setSendCachingCommands(true);
     } finally {
       writeUnlock();
       startingActiveService = false;
@@ -1010,6 +1024,8 @@ public class FSNamesystem implements Nam
         // so that the tailer starts from the right spot.
         dir.fsImage.updateLastAppliedTxIdFromWritten();
       }
+      cacheManager.deactivate();
+      blockManager.getDatanodeManager().setSendCachingCommands(false);
     } finally {
       writeUnlock();
     }
@@ -1583,8 +1599,14 @@ public class FSNamesystem implements Nam
           length = Math.min(length, fileSize - offset);
           isUc = false;
         }
-        return blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
+        LocatedBlocks blocks =
+          blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
             isUc, offset, length, needBlockToken, iip.isSnapshot());
+        // Set caching information for the located blocks.
+        for (LocatedBlock lb: blocks.getLocatedBlocks()) {
+          cacheManager.setCachedLocations(lb);
+        }
+        return blocks;
       } finally {
         if (isReadOp) {
           readUnlock();
@@ -4057,15 +4079,16 @@ public class FSNamesystem implements Nam
    * @throws IOException
    */
   HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
-      StorageReport[] reports, int xceiverCount, int xmitsInProgress,
-      int failedVolumes)
+      StorageReport[] reports, long cacheCapacity, long cacheUsed,
+      int xceiverCount, int xmitsInProgress, int failedVolumes)
         throws IOException {
     readLock();
     try {
       final int maxTransfer = blockManager.getMaxReplicationStreams()
           - xmitsInProgress;
       DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
-          nodeReg, reports, blockPoolId, xceiverCount, maxTransfer, failedVolumes);
+          nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
+          xceiverCount, maxTransfer, failedVolumes);
       return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
     } finally {
       readUnlock();
@@ -6517,6 +6540,10 @@ public class FSNamesystem implements Nam
   public FSDirectory getFSDirectory() {
     return dir;
   }
+  /** @return the cache manager. */
+  public CacheManager getCacheManager() {
+    return cacheManager;
+  }
 
   @Override  // NameNodeMXBean
   public String getCorruptFiles() {
@@ -6893,6 +6920,215 @@ public class FSNamesystem implements Nam
     }
   }
 
+  PathBasedCacheDescriptor addPathBasedCacheDirective(
+      PathBasedCacheDirective directive) throws IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
+    CacheEntryWithPayload cacheEntry =
+        RetryCache.waitForCompletion(retryCache, null);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (PathBasedCacheDescriptor) cacheEntry.getPayload();
+    }
+    boolean success = false;
+    PathBasedCacheDescriptor result = null;
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot add PathBasedCache directive", safeMode);
+      }
+      result = cacheManager.addDirective(directive, pc);
+      getEditLog().logAddPathBasedCacheDirective(directive,
+          cacheEntry != null);
+      success = true;
+    } finally {
+      writeUnlock();
+      if (success) {
+        getEditLog().logSync();
+      }
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
+      }
+      RetryCache.setState(cacheEntry, success, result);
+    }
+    return result;
+  }
+
+  void removePathBasedCacheDescriptor(Long id) throws IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return;
+    }
+    boolean success = false;
+    writeLock();
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot remove PathBasedCache directives", safeMode);
+      }
+      cacheManager.removeDescriptor(id, pc);
+      getEditLog().logRemovePathBasedCacheDescriptor(id, cacheEntry != null);
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "removePathBasedCacheDescriptor", null, null,
+            null);
+      }
+      RetryCache.setState(cacheEntry, success);
+    }
+    getEditLog().logSync();
+  }
+
+  BatchedListEntries<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
+      long startId, String pool, String path) throws IOException {
+    checkOperation(OperationCategory.READ);
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
+    BatchedListEntries<PathBasedCacheDescriptor> results;
+    readLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.READ);
+      results =
+          cacheManager.listPathBasedCacheDescriptors(startId, pool, path, pc);
+      success = true;
+    } finally {
+      readUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "listPathBasedCacheDescriptors", null, null,
+            null);
+      }
+    }
+    return results;
+  }
+
+  public void addCachePool(CachePoolInfo req) throws IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc = isPermissionEnabled ?
+        getPermissionChecker() : null;
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    writeLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot add cache pool " + req.getPoolName(), safeMode);
+      }
+      if (pc != null) {
+        pc.checkSuperuserPrivilege();
+      }
+      CachePoolInfo info = cacheManager.addCachePool(req);
+      getEditLog().logAddCachePool(info, cacheEntry != null);
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "addCachePool", req.getPoolName(), null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
+    }
+    
+    getEditLog().logSync();
+  }
+
+  public void modifyCachePool(CachePoolInfo req) throws IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc =
+        isPermissionEnabled ? getPermissionChecker() : null;
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    writeLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot modify cache pool " + req.getPoolName(), safeMode);
+      }
+      if (pc != null) {
+        pc.checkSuperuserPrivilege();
+      }
+      cacheManager.modifyCachePool(req);
+      getEditLog().logModifyCachePool(req, cacheEntry != null);
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "modifyCachePool", req.getPoolName(), null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
+    }
+
+    getEditLog().logSync();
+  }
+
+  public void removeCachePool(String cachePoolName) throws IOException {
+    checkOperation(OperationCategory.WRITE);
+    final FSPermissionChecker pc =
+        isPermissionEnabled ? getPermissionChecker() : null;
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return; // Return previous response
+    }
+    writeLock();
+    boolean success = false;
+    try {
+      checkOperation(OperationCategory.WRITE);
+      if (isInSafeMode()) {
+        throw new SafeModeException(
+            "Cannot remove cache pool " + cachePoolName, safeMode);
+      }
+      if (pc != null) {
+        pc.checkSuperuserPrivilege();
+      }
+      cacheManager.removeCachePool(cachePoolName);
+      getEditLog().logRemoveCachePool(cachePoolName, cacheEntry != null);
+      success = true;
+    } finally {
+      writeUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "removeCachePool", cachePoolName, null, null);
+      }
+      RetryCache.setState(cacheEntry, success);
+    }
+    
+    getEditLog().logSync();
+  }
+
+  public BatchedListEntries<CachePoolInfo> listCachePools(String prevKey)
+      throws IOException {
+    final FSPermissionChecker pc =
+        isPermissionEnabled ? getPermissionChecker() : null;
+    BatchedListEntries<CachePoolInfo> results;
+    checkOperation(OperationCategory.READ);
+    boolean success = false;
+    readLock();
+    try {
+      checkOperation(OperationCategory.READ);
+      results = cacheManager.listCachePools(pc, prevKey);
+      success = true;
+    } finally {
+      readUnlock();
+      if (isAuditEnabled() && isExternalInvocation()) {
+        logAuditEvent(success, "listCachePools", null, null, null);
+      }
+    }
+    return results;
+  }
+
   /**
    * Default AuditLogger implementation; used when no access logger is
    * defined in the config file. It can also be explicitly listed in the
@@ -6950,10 +7186,9 @@ public class FSNamesystem implements Nam
         logAuditMessage(sb.toString());
       }
     }
-
     public void logAuditMessage(String message) {
       auditLog.info(message);
     }
   }
-
 }
+

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Tue Oct 29 21:04:31 2013
@@ -255,4 +255,30 @@ class FSPermissionChecker {
     throw new AccessControlException("Permission denied by sticky bit setting:" +
       " user=" + user + ", inode=" + inode);
   }
+
+  /**
+   * Whether a cache pool can be accessed by the current context
+   *
+   * @param pool CachePool being accessed
+   * @param access type of action being performed on the cache pool
+   * @return if the pool can be accessed
+   */
+  public boolean checkPermission(CachePool pool, FsAction access) {
+    FsPermission mode = pool.getMode();
+    if (isSuperUser()) {
+      return true;
+    }
+    if (user.equals(pool.getOwnerName())
+        && mode.getUserAction().implies(access)) {
+      return true;
+    }
+    if (groups.contains(pool.getGroupName())
+        && mode.getGroupAction().implies(access)) {
+      return true;
+    }
+    if (mode.getOtherAction().implies(access)) {
+      return true;
+    }
+    return false;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Oct 29 21:04:31 2013
@@ -686,8 +686,13 @@ public class NameNode implements NameNod
     try {
       initializeGenericKeys(conf, nsId, namenodeId);
       initialize(conf);
-      state.prepareToEnterState(haContext);
-      state.enterState(haContext);
+      try {
+        haContext.writeLock();
+        state.prepareToEnterState(haContext);
+        state.enterState(haContext);
+      } finally {
+        haContext.writeUnlock();
+      }
     } catch (IOException e) {
       this.stop();
       throw e;

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Oct 29 21:04:31 2013
@@ -36,6 +36,7 @@ import java.util.Set;
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.fs.ContentSummary;
 import org.apache.hadoop.fs.CreateFlag;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.InvalidPathE
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.ParentNotDirectoryException;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.UnresolvedLinkException;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -59,6 +61,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HDFSPolicyProvider;
 import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
 import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
 import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -193,9 +198,9 @@ class NameNodeRpcServer implements Namen
 
     NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator = 
         new NamenodeProtocolServerSideTranslatorPB(this);
-	  BlockingService NNPbService = NamenodeProtocolService
+    BlockingService NNPbService = NamenodeProtocolService
           .newReflectiveBlockingService(namenodeProtocolXlator);
-	  
+    
     RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator = 
         new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
     BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
@@ -215,7 +220,7 @@ class NameNodeRpcServer implements Namen
         new HAServiceProtocolServerSideTranslatorPB(this);
     BlockingService haPbService = HAServiceProtocolService
         .newReflectiveBlockingService(haServiceProtocolXlator);
-	  
+    
     WritableRpcEngine.ensureInitialized();
 
     InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
@@ -957,11 +962,13 @@ class NameNodeRpcServer implements Namen
 
   @Override // DatanodeProtocol
   public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
-      StorageReport[] report, int xmitsInProgress, int xceiverCount,
+      StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
+      int xmitsInProgress, int xceiverCount,
       int failedVolumes) throws IOException {
     verifyRequest(nodeReg);
-    return namesystem.handleHeartbeat(nodeReg, report, xceiverCount,
-        xmitsInProgress, failedVolumes);
+    return namesystem.handleHeartbeat(nodeReg, report,
+        dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
+        failedVolumes);
   }
 
   @Override // DatanodeProtocol
@@ -983,6 +990,18 @@ class NameNodeRpcServer implements Namen
     return null;
   }
 
+  @Override
+  public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
+      String poolId, List<Long> blockIds) throws IOException {
+    verifyRequest(nodeReg);
+    if (blockStateChangeLog.isDebugEnabled()) {
+      blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
+           + "from " + nodeReg + " " + blockIds.size() + " blocks");
+    }
+    namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
+    return null;
+  }
+
   @Override // DatanodeProtocol
   public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
       StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
@@ -1216,4 +1235,88 @@ class NameNodeRpcServer implements Namen
     metrics.incrSnapshotDiffReportOps();
     return report;
   }
+
+  @Override
+  public PathBasedCacheDescriptor addPathBasedCacheDirective(
+      PathBasedCacheDirective path) throws IOException {
+    return namesystem.addPathBasedCacheDirective(path);
+  }
+
+  @Override
+  public void removePathBasedCacheDescriptor(Long id) throws IOException {
+    namesystem.removePathBasedCacheDescriptor(id);
+  }
+
+  private class ServerSidePathBasedCacheEntriesIterator
+      extends BatchedRemoteIterator<Long, PathBasedCacheDescriptor> {
+
+    private final String pool;
+
+    private final String path;
+
+    public ServerSidePathBasedCacheEntriesIterator(Long firstKey, String pool,
+        String path) {
+      super(firstKey);
+      this.pool = pool;
+      this.path = path;
+    }
+
+    @Override
+    public BatchedEntries<PathBasedCacheDescriptor> makeRequest(
+        Long nextKey) throws IOException {
+      return namesystem.listPathBasedCacheDescriptors(nextKey, pool, path);
+    }
+
+    @Override
+    public Long elementToPrevKey(PathBasedCacheDescriptor entry) {
+      return entry.getEntryId();
+    }
+  }
+  
+  @Override
+  public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(long prevId,
+      String pool, String path) throws IOException {
+    return new ServerSidePathBasedCacheEntriesIterator(prevId, pool, path);
+  }
+
+  @Override
+  public void addCachePool(CachePoolInfo info) throws IOException {
+    namesystem.addCachePool(info);
+  }
+
+  @Override
+  public void modifyCachePool(CachePoolInfo info) throws IOException {
+    namesystem.modifyCachePool(info);
+  }
+
+  @Override
+  public void removeCachePool(String cachePoolName) throws IOException {
+    namesystem.removeCachePool(cachePoolName);
+  }
+
+  private class ServerSideCachePoolIterator 
+      extends BatchedRemoteIterator<String, CachePoolInfo> {
+
+    public ServerSideCachePoolIterator(String prevKey) {
+      super(prevKey);
+    }
+
+    @Override
+    public BatchedEntries<CachePoolInfo> makeRequest(String prevKey)
+        throws IOException {
+      return namesystem.listCachePools(prevKey);
+    }
+
+    @Override
+    public String elementToPrevKey(CachePoolInfo element) {
+      return element.getPoolName();
+    }
+  }
+
+  @Override
+  public RemoteIterator<CachePoolInfo> listCachePools(String prevKey)
+      throws IOException {
+    return new ServerSideCachePoolIterator(prevKey);
+  }
 }
+

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Tue Oct 29 21:04:31 2013
@@ -79,6 +79,8 @@ public class NameNodeMetrics {
   MutableCounterLong transactionsBatchedInSync;
   @Metric("Block report") MutableRate blockReport;
   MutableQuantiles[] blockReportQuantiles;
+  @Metric("Cache report") MutableRate cacheReport;
+  MutableQuantiles[] cacheReportQuantiles;
 
   @Metric("Duration in SafeMode at startup in msec")
   MutableGaugeInt safeModeTime;
@@ -91,6 +93,7 @@ public class NameNodeMetrics {
     final int len = intervals.length;
     syncsQuantiles = new MutableQuantiles[len];
     blockReportQuantiles = new MutableQuantiles[len];
+    cacheReportQuantiles = new MutableQuantiles[len];
     
     for (int i = 0; i < len; i++) {
       int interval = intervals[i];
@@ -100,6 +103,9 @@ public class NameNodeMetrics {
       blockReportQuantiles[i] = registry.newQuantiles(
           "blockReport" + interval + "s", 
           "Block report", "ops", "latency", interval);
+      cacheReportQuantiles[i] = registry.newQuantiles(
+          "cacheReport" + interval + "s",
+          "Cache report", "ops", "latency", interval);
     }
   }
 
@@ -229,6 +235,13 @@ public class NameNodeMetrics {
     }
   }
 
+  public void addCacheBlockReport(long latency) {
+    cacheReport.add(latency);
+    for (MutableQuantiles q : cacheReportQuantiles) {
+      q.add(latency);
+    }
+  }
+
   public void setSafeModeTime(long elapsed) {
     safeModeTime.set((int) elapsed);
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java Tue Oct 29 21:04:31 2013
@@ -42,7 +42,17 @@ public enum StepType {
   /**
    * The namenode is performing an operation related to inodes.
    */
-  INODES("Inodes", "inodes");
+  INODES("Inodes", "inodes"),
+
+  /**
+   * The namenode is performing an operation related to cache pools.
+   */
+  CACHE_POOLS("CachePools", "cache pools"),
+
+  /**
+   * The namenode is performing an operation related to cache entries.
+   */
+  CACHE_ENTRIES("CacheEntries", "cache entries");
 
   private final String name, description;
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Tue Oct 29 21:04:31 2013
@@ -19,13 +19,14 @@
 package org.apache.hadoop.hdfs.server.protocol;
 
 import java.io.*;
+import java.util.List;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.io.retry.AtMostOnce;
 import org.apache.hadoop.io.retry.Idempotent;
 import org.apache.hadoop.security.KerberosInfo;
 
@@ -74,6 +75,8 @@ public interface DatanodeProtocol {
   final static int DNA_RECOVERBLOCK = 6;  // request a block recovery
   final static int DNA_ACCESSKEYUPDATE = 7;  // update access key
   final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
+  final static int DNA_CACHE = 9;      // cache blocks
+  final static int DNA_UNCACHE = 10;   // uncache blocks
 
   /** 
    * Register Datanode.
@@ -104,6 +107,8 @@ public interface DatanodeProtocol {
   @Idempotent
   public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
                                        StorageReport[] reports,
+                                       long dnCacheCapacity,
+                                       long dnCacheUsed,
                                        int xmitsInProgress,
                                        int xceiverCount,
                                        int failedVolumes) throws IOException;
@@ -128,6 +133,24 @@ public interface DatanodeProtocol {
   public DatanodeCommand blockReport(DatanodeRegistration registration,
       String poolId, StorageBlockReport[] reports) throws IOException;
     
+
+  /**
+   * Communicates the complete list of locally cached blocks to the NameNode.
+   * 
+   * This method is similar to
+   * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
+   * which is used to communicated blocks stored on disk.
+   *
+   * @param            The datanode registration.
+   * @param poolId     The block pool ID for the blocks.
+   * @param blockIds   A list of block IDs.
+   * @return           The DatanodeCommand.
+   * @throws IOException
+   */
+  @Idempotent
+  public DatanodeCommand cacheReport(DatanodeRegistration registration,
+      String poolId, List<Long> blockIds) throws IOException;
+
   /**
    * blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
    * recently-received and -deleted block data. 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Tue Oct 29 21:04:31 2013
@@ -126,7 +126,7 @@ class ImageLoaderCurrent implements Imag
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
       -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
-      -40, -41, -42, -43, -44, -45, -46, -47 };
+      -40, -41, -42, -43, -44, -45, -46, -47, -48 };
   private int imageVersion = 0;
   
   private final Map<Long, String> subtreeMap = new HashMap<Long, String>();
@@ -216,6 +216,9 @@ class ImageLoaderCurrent implements Imag
         processDelegationTokens(in, v);
       }
       
+      if (LayoutVersion.supports(Feature.CACHING, imageVersion)) {
+        processCacheManagerState(in, v);
+      }
       v.leaveEnclosingElement(); // FSImage
       done = true;
     } finally {
@@ -228,6 +231,25 @@ class ImageLoaderCurrent implements Imag
   }
 
   /**
+   * Process CacheManager state from the fsimage.
+   */
+  private void processCacheManagerState(DataInputStream in, ImageVisitor v)
+      throws IOException {
+    v.visit(ImageElement.CACHE_NEXT_ENTRY_ID, in.readLong());
+    final int numPools = in.readInt();
+    for (int i=0; i<numPools; i++) {
+      v.visit(ImageElement.CACHE_POOL_NAME, Text.readString(in));
+      processCachePoolPermission(in, v);
+      v.visit(ImageElement.CACHE_POOL_WEIGHT, in.readInt());
+    }
+    final int numEntries = in.readInt();
+    for (int i=0; i<numEntries; i++) {
+      v.visit(ImageElement.CACHE_ENTRY_PATH, Text.readString(in));
+      v.visit(ImageElement.CACHE_ENTRY_REPLICATION, in.readShort());
+      v.visit(ImageElement.CACHE_ENTRY_POOL_NAME, Text.readString(in));
+    }
+  }
+  /**
    * Process the Delegation Token related section in fsimage.
    * 
    * @param in DataInputStream to process
@@ -385,6 +407,22 @@ class ImageLoaderCurrent implements Imag
   }
 
   /**
+   * Extract CachePool permissions stored in the fsimage file.
+   *
+   * @param in Datastream to process
+   * @param v Visitor to walk over inodes
+   */
+  private void processCachePoolPermission(DataInputStream in, ImageVisitor v)
+      throws IOException {
+    v.visitEnclosingElement(ImageElement.PERMISSIONS);
+    v.visit(ImageElement.CACHE_POOL_OWNER_NAME, Text.readString(in));
+    v.visit(ImageElement.CACHE_POOL_GROUP_NAME, Text.readString(in));
+    FsPermission fsp = new FsPermission(in.readShort());
+    v.visit(ImageElement.CACHE_POOL_PERMISSION_STRING, fsp.toString());
+    v.leaveEnclosingElement(); // Permissions
+  }
+
+  /**
    * Process the INode records stored in the fsimage.
    *
    * @param in Datastream to process

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java Tue Oct 29 21:04:31 2013
@@ -117,7 +117,19 @@ abstract class ImageVisitor {
     SNAPSHOT_DST_SNAPSHOT_ID,
     SNAPSHOT_LAST_SNAPSHOT_ID,
     SNAPSHOT_REF_INODE_ID,
-    SNAPSHOT_REF_INODE
+    SNAPSHOT_REF_INODE,
+
+    CACHE_NEXT_ENTRY_ID,
+    CACHE_NUM_POOLS,
+    CACHE_POOL_NAME,
+    CACHE_POOL_OWNER_NAME,
+    CACHE_POOL_GROUP_NAME,
+    CACHE_POOL_PERMISSION_STRING,
+    CACHE_POOL_WEIGHT,
+    CACHE_NUM_ENTRIES,
+    CACHE_ENTRY_PATH,
+    CACHE_ENTRY_REPLICATION,
+    CACHE_ENTRY_POOL_NAME
   }
   
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java Tue Oct 29 21:04:31 2013
@@ -290,6 +290,8 @@ public class JsonUtil {
     m.put("dfsUsed", datanodeinfo.getDfsUsed());
     m.put("remaining", datanodeinfo.getRemaining());
     m.put("blockPoolUsed", datanodeinfo.getBlockPoolUsed());
+    m.put("cacheCapacity", datanodeinfo.getCacheCapacity());
+    m.put("cacheUsed", datanodeinfo.getCacheUsed());
     m.put("lastUpdate", datanodeinfo.getLastUpdate());
     m.put("xceiverCount", datanodeinfo.getXceiverCount());
     m.put("networkLocation", datanodeinfo.getNetworkLocation());
@@ -297,16 +299,36 @@ public class JsonUtil {
     return m;
   }
 
+  private static int getInt(Map<?, ?> m, String key, final int defaultValue) {
+    Object value = m.get(key);
+    if (value == null) {
+      return defaultValue;
+    }
+    return (int) (long) (Long) value;
+  }
+
+  private static long getLong(Map<?, ?> m, String key, final long defaultValue) {
+    Object value = m.get(key);
+    if (value == null) {
+      return defaultValue;
+    }
+    return (long) (Long) value;
+  }
+
+  private static String getString(Map<?, ?> m, String key,
+      final String defaultValue) {
+    Object value = m.get(key);
+    if (value == null) {
+      return defaultValue;
+    }
+    return (String) value;
+  }
+
   /** Convert a Json map to an DatanodeInfo object. */
   static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
     if (m == null) {
       return null;
     }
-    
-    Object infoSecurePort = m.get("infoSecurePort");
-    if (infoSecurePort == null) {
-      infoSecurePort = 0l; // same as the default value in hdfs.proto
-    }
 
     // TODO: Fix storageID
     return new DatanodeInfo(
@@ -315,17 +337,19 @@ public class JsonUtil {
         (String)m.get("storageID"),
         (int)(long)(Long)m.get("xferPort"),
         (int)(long)(Long)m.get("infoPort"),
-        (int)(long)(Long)infoSecurePort,
+        getInt(m, "infoSecurePort", 0),
         (int)(long)(Long)m.get("ipcPort"),
 
-        (Long)m.get("capacity"),
-        (Long)m.get("dfsUsed"),
-        (Long)m.get("remaining"),
-        (Long)m.get("blockPoolUsed"),
-        (Long)m.get("lastUpdate"),
-        (int)(long)(Long)m.get("xceiverCount"),
-        (String)m.get("networkLocation"),
-        AdminStates.valueOf((String)m.get("adminState")));
+        getLong(m, "capacity", 0l),
+        getLong(m, "dfsUsed", 0l),
+        getLong(m, "remaining", 0l),
+        getLong(m, "blockPoolUsed", 0l),
+        getLong(m, "cacheCapacity", 0l),
+        getLong(m, "cacheUsed", 0l),
+        getLong(m, "lastUpdate", 0l),
+        getInt(m, "xceiverCount", 0),
+        getString(m, "networkLocation", ""),
+        AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
   }
 
   /** Convert a DatanodeInfo[] to a Json array. */

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1509426-1536569
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1536572

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Tue Oct 29 21:04:31 2013
@@ -364,6 +364,91 @@ message IsFileClosedResponseProto {
   required bool result = 1;
 }
 
+message PathBasedCacheDirectiveProto {
+  required string path = 1;
+  required uint32 replication = 2;
+  required string pool = 3;
+}
+
+message AddPathBasedCacheDirectiveRequestProto {
+  required PathBasedCacheDirectiveProto directive = 1;
+}
+
+message AddPathBasedCacheDirectiveResponseProto {
+  required int64 descriptorId = 1;
+}
+
+message RemovePathBasedCacheDescriptorRequestProto {
+  required int64 descriptorId = 1;
+}
+
+message RemovePathBasedCacheDescriptorResponseProto {
+}
+
+message ListPathBasedCacheDescriptorsRequestProto {
+  required int64 prevId = 1;
+  optional string pool = 2;
+  optional string path = 3;
+}
+
+message ListPathBasedCacheDescriptorsElementProto {
+  required int64 id = 1;
+  required string pool = 2;
+  required uint32 replication = 3;
+  required string path = 4;
+}
+
+message ListPathBasedCacheDescriptorsResponseProto {
+  repeated ListPathBasedCacheDescriptorsElementProto elements = 1;
+  required bool hasMore = 2;
+}
+
+message AddCachePoolRequestProto {
+  required string poolName = 1;
+  optional string ownerName = 2;
+  optional string groupName = 3;
+  optional int32 mode = 4;
+  optional int32 weight = 5;
+}
+
+message AddCachePoolResponseProto { // void response
+}
+
+message ModifyCachePoolRequestProto {
+  required string poolName = 1;
+  optional string ownerName = 2;
+  optional string groupName = 3;
+  optional int32 mode = 4;
+  optional int32 weight = 5;
+}
+
+message ModifyCachePoolResponseProto { // void response
+}
+
+message RemoveCachePoolRequestProto {
+  required string poolName = 1;
+}
+
+message RemoveCachePoolResponseProto { // void response
+}
+
+message ListCachePoolsRequestProto {
+  required string prevPoolName = 1;
+}
+
+message ListCachePoolsResponseProto {
+  repeated ListCachePoolsResponseElementProto elements = 1;
+  required bool hasMore = 2;
+}
+
+message ListCachePoolsResponseElementProto {
+  required string poolName = 1;
+  required string ownerName = 2;
+  required string groupName = 3;
+  required int32 mode = 4;
+  required int32 weight = 5;
+}
+
 message GetFileLinkInfoRequestProto {
   required string src = 1;
 }
@@ -546,6 +631,20 @@ service ClientNamenodeProtocol {
       returns(ListCorruptFileBlocksResponseProto);
   rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto);
   rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
+  rpc addPathBasedCacheDirective(AddPathBasedCacheDirectiveRequestProto)
+      returns (AddPathBasedCacheDirectiveResponseProto);
+  rpc removePathBasedCacheDescriptor(RemovePathBasedCacheDescriptorRequestProto)
+      returns (RemovePathBasedCacheDescriptorResponseProto);
+  rpc listPathBasedCacheDescriptors(ListPathBasedCacheDescriptorsRequestProto)
+      returns (ListPathBasedCacheDescriptorsResponseProto);
+  rpc addCachePool(AddCachePoolRequestProto)
+      returns(AddCachePoolResponseProto);
+  rpc modifyCachePool(ModifyCachePoolRequestProto)
+      returns(ModifyCachePoolResponseProto);
+  rpc removeCachePool(RemoveCachePoolRequestProto)
+      returns(RemoveCachePoolResponseProto);
+  rpc listCachePools(ListCachePoolsRequestProto)
+      returns(ListCachePoolsResponseProto);
   rpc getFileLinkInfo(GetFileLinkInfoRequestProto)
       returns(GetFileLinkInfoResponseProto);
   rpc getContentSummary(GetContentSummaryRequestProto)

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Tue Oct 29 21:04:31 2013
@@ -71,6 +71,7 @@ message DatanodeCommandProto {
     RegisterCommand = 5;
     UnusedUpgradeCommand = 6;
     NullDatanodeCommand = 7;
+    BlockIdCommand = 8;
   }
 
   required Type cmdType = 1;    // Type of the command
@@ -83,6 +84,7 @@ message DatanodeCommandProto {
   optional FinalizeCommandProto finalizeCmd = 5;
   optional KeyUpdateCommandProto keyUpdateCmd = 6;
   optional RegisterCommandProto registerCmd = 7;
+  optional BlockIdCommandProto blkIdCmd = 8;
 }
 
 /**
@@ -103,7 +105,7 @@ message BlockCommandProto {
   enum Action {  
     TRANSFER = 1;   // Transfer blocks to another datanode
     INVALIDATE = 2; // Invalidate blocks
-    SHUTDOWN = 3; // Shutdown the datanode
+    SHUTDOWN = 3;   // Shutdown the datanode
   }
 
   required Action action = 1;
@@ -114,6 +116,20 @@ message BlockCommandProto {
 }
 
 /**
+ * Command to instruct datanodes to perform certain action
+ * on the given set of block IDs.
+ */
+message BlockIdCommandProto {
+  enum Action {
+    CACHE = 1;
+    UNCACHE = 2;
+  }
+  required Action action = 1;
+  required string blockPoolId = 2;
+  repeated uint64 blockIds = 3 [packed=true];
+}
+
+/**
  * List of blocks to be recovered by the datanode
  */
 message BlockRecoveryCommandProto {
@@ -166,6 +182,8 @@ message RegisterDatanodeResponseProto {
  * xmitsInProgress - number of transfers from this datanode to others
  * xceiverCount - number of active transceiver threads
  * failedVolumes - number of failed volumes
+ * cacheCapacity - total cache capacity available at the datanode
+ * cacheUsed - amount of cache used
  */
 message HeartbeatRequestProto {
   required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -173,6 +191,8 @@ message HeartbeatRequestProto {
   optional uint32 xmitsInProgress = 3 [ default = 0 ];
   optional uint32 xceiverCount = 4 [ default = 0 ];
   optional uint32 failedVolumes = 5 [ default = 0 ];
+  optional uint64 dnCacheCapacity = 6 [ default = 0 ];
+  optional uint64 dnCacheUsed = 7 [default = 0 ];
 }
 
 message StorageReportProto {
@@ -205,9 +225,11 @@ message HeartbeatResponseProto {
 /**
  * registration - datanode registration information
  * blockPoolID  - block pool ID of the reported blocks
- * blocks       - each block is represented as two longs in the array.
+ * blocks       - each block is represented as multiple longs in the array.
  *                first long represents block ID
  *                second long represents length
+ *                third long represents gen stamp
+ *                fourth long (if under construction) represents replica state
  */
 message BlockReportRequestProto {
   required DatanodeRegistrationProto registration = 1;
@@ -231,6 +253,21 @@ message BlockReportResponseProto {
 } 
 
 /**
+ * registration - datanode registration information
+ * blockPoolId  - block pool ID of the reported blocks
+ * blocks       - representation of blocks as longs for efficiency reasons
+ */
+message CacheReportRequestProto {
+  required DatanodeRegistrationProto registration = 1;
+  required string blockPoolId = 2;
+  repeated uint64 blocks = 3 [packed=true];
+}
+
+message CacheReportResponseProto {
+  optional DatanodeCommandProto cmd = 1;
+}
+
+/**
  * Data structure to send received or deleted block information
  * from datanode to namenode.
  */
@@ -348,6 +385,11 @@ service DatanodeProtocolService {
   rpc blockReport(BlockReportRequestProto) returns(BlockReportResponseProto);
 
   /**
+   * Report cached blocks at a datanode to the namenode
+   */
+  rpc cacheReport(CacheReportRequestProto) returns(CacheReportResponseProto);
+
+  /**
    * Incremental block report from the DN. This contains info about recently
    * received and deleted blocks, as well as when blocks start being
    * received.

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Tue Oct 29 21:04:31 2013
@@ -86,6 +86,8 @@ message DatanodeInfoProto {
   }
 
   optional AdminState adminState = 10 [default = NORMAL];
+  optional uint64 cacheCapacity = 11 [default = 0];
+  optional uint64 cacheUsed = 12 [default = 0];
 }
 
 /**
@@ -144,8 +146,9 @@ message LocatedBlockProto {
                                         // their locations are not part of this object
 
   required hadoop.common.TokenProto blockToken = 5;
-  repeated StorageTypeProto storageTypes = 6;
-  repeated string storageIDs = 7;
+  repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
+  repeated StorageTypeProto storageTypes = 7;
+  repeated string storageIDs = 8;
 }
 
 message DataEncryptionKeyProto {
@@ -455,3 +458,4 @@ message SnapshotInfoProto {
   // TODO: do we need access time?
 }
 
+

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Tue Oct 29 21:04:31 2013
@@ -1459,4 +1459,70 @@
   </description>
 </property>
 
+<property>
+  <name>dfs.namenode.caching.enabled</name>
+  <value>false</value>
+  <description>
+    Set to true to enable block caching.  This flag enables the NameNode to
+    maintain a mapping of cached blocks to DataNodes via processing DataNode
+    cache reports.  Based on these reports and addition and removal of caching
+    directives, the NameNode will schedule caching and uncaching work.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.max.locked.memory</name>
+  <value>0</value>
+  <description>
+    The amount of memory in bytes to use for caching of block replicas in
+    memory on the datanode. The datanode's maximum locked memory soft ulimit
+    (RLIMIT_MEMLOCK) must be set to at least this value, else the datanode
+    will abort on startup.
+
+    By default, this parameter is set to 0, which disables in-memory caching.
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
+<property>
+  <name>dfs.namenode.path.based.cache.refresh.interval.ms</name>
+  <value>300000</value>
+  <description>
+    The amount of milliseconds between subsequent path cache rescans.  Path
+    cache rescans are when we calculate which blocks should be cached, and on
+    what datanodes.
+
+    By default, this parameter is set to 300000, which is five minutes.
+  </description>
+</property>
+
+<property>
+  <name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
+  <value>4</value>
+  <description>
+    The maximum number of threads per volume to use for caching new data
+    on the datanode. These threads consume both I/O and CPU. This can affect
+    normal datanode operations.
+  </description>
+</property>
+
+<property>
+  <name>dfs.cachereport.intervalMsec</name>
+  <value>10000</value>
+  <description>
+    Determines cache reporting interval in milliseconds.  After this amount of
+    time, the DataNode sends a full report of its cache state to the NameNode.
+    The NameNode uses the cache report to update its map of cached blocks to
+    DataNode locations.
+
+    This configuration has no effect if in-memory caching has been disabled by
+    setting dfs.datanode.max.locked.memory to 0 (which is the default).
+
+    If the native libraries are not available to the DataNode, this
+    configuration has no effect.
+  </description>
+</property>
+
 </configuration>

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1536572
  Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1509426-1536569

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1536572
  Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1509426-1536569

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1536572
  Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1509426-1536569

Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1536572
  Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1509426-1536569

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Oct 29 21:04:31 2013
@@ -823,7 +823,7 @@ public class DFSTestUtil {
         DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
         DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
-        1, 2, 3, 4, 5, 6, "local", adminState);
+        1l, 2l, 3l, 4l, 0l, 0l, 5, 6, "local", adminState);
   }
 
   public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
@@ -1033,6 +1033,20 @@ public class DFSTestUtil {
       locatedBlocks = DFSClientAdapter.callGetBlockLocations(
           cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
     } while (locatedBlocks.isUnderConstruction());
+    // OP_ADD_CACHE_POOL 35
+    filesystem.addCachePool(new CachePoolInfo("pool1"));
+    // OP_MODIFY_CACHE_POOL 36
+    filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
+    // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
+    PathBasedCacheDescriptor pbcd = filesystem.addPathBasedCacheDirective(
+        new PathBasedCacheDirective.Builder().
+            setPath(new Path("/path")).
+            setPool("pool1").
+            build());
+    // OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34
+    filesystem.removePathBasedCacheDescriptor(pbcd);
+    // OP_REMOVE_CACHE_POOL 37
+    filesystem.removeCachePool("pool1");
   }
 
   public static void abortStream(DFSOutputStream out) throws IOException {

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java Tue Oct 29 21:04:31 2013
@@ -61,4 +61,15 @@ public class LogVerificationAppender ext
     }
     return count;
   }
+
+  public int countLinesWithMessage(final String text) {
+    int count = 0;
+    for (LoggingEvent e: getLog()) {
+      String msg = e.getRenderedMessage();
+      if (msg != null && msg.contains(text)) {
+        count++;
+      }
+    }
+    return count;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java Tue Oct 29 21:04:31 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
 import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
 
 import java.io.File;
 import java.io.IOException;
@@ -30,6 +31,8 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
 import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -106,4 +109,38 @@ public class TestDatanodeConfig {
       throw new IOException("Bad URI", e);
     }
   }
+
+  @Test(timeout=60000)
+  public void testMemlockLimit() throws Exception {
+    assumeTrue(NativeIO.isAvailable());
+    final long memlockLimit = NativeIO.getMemlockLimit();
+
+    // Can't increase the memlock limit past the maximum.
+    assumeTrue(memlockLimit != Long.MAX_VALUE);
+
+    Configuration conf = cluster.getConfiguration(0);
+    long prevLimit = conf.
+        getLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+            DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
+    try {
+      // Try starting the DN with limit configured to the ulimit
+      conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          memlockLimit);
+      DataNode dn = null;
+      dn = DataNode.createDataNode(new String[]{},  conf);
+      dn.shutdown();
+      // Try starting the DN with a limit > ulimit
+      conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          memlockLimit+1);
+      try {
+        dn = DataNode.createDataNode(new String[]{}, conf);
+      } catch (RuntimeException e) {
+        GenericTestUtils.assertExceptionContains(
+            "more than the datanode's available RLIMIT_MEMLOCK", e);
+      }
+    } finally {
+      conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+          prevLimit);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Oct 29 21:04:31 2013
@@ -107,7 +107,7 @@ public class TestBlockManager {
           2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
           2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
       dn.updateHeartbeat(
-          BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0, 0);
+          BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0);
       bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
     }
   }
@@ -600,3 +600,4 @@ public class TestBlockManager {
     assertEquals(1, ds.getBlockReportCount());
   }
 }
+

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java Tue Oct 29 21:04:31 2013
@@ -106,7 +106,7 @@ public class TestOverReplicatedBlocks {
               datanode.getStorageInfos()[0].setUtilization(100L, 100L, 0, 100L);
               datanode.updateHeartbeat(
                   BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
-                  0, 0);
+                  0L, 0L, 0, 0);
             }
           }
 
@@ -223,3 +223,4 @@ public class TestOverReplicatedBlocks {
     }
   }
 }
+

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Oct 29 21:04:31 2013
@@ -92,12 +92,12 @@ public class TestReplicationPolicy {
   
   private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
     long capacity, long dfsUsed, long remaining, long blockPoolUsed,
-    int xceiverCount, int volFailures) {
+    long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
     dn.getStorageInfos()[0].setUtilization(
         capacity, dfsUsed, remaining, blockPoolUsed);
     dn.updateHeartbeat(
         BlockManagerTestUtil.getStorageReportsForDatanode(dn),
-        xceiverCount, volFailures);
+        dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
   }
 
   @BeforeClass
@@ -138,7 +138,7 @@ public class TestReplicationPolicy {
     for (int i=0; i < NUM_OF_DATANODES; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }    
   }
 
@@ -162,7 +162,8 @@ public class TestReplicationPolicy {
   public void testChooseTarget1() throws Exception {
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+        0L, 0L, 4, 0); // overloaded
 
     DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
@@ -192,7 +193,7 @@ public class TestReplicationPolicy {
     
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
 
   private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
@@ -315,7 +316,8 @@ public class TestReplicationPolicy {
     // make data node 0 to be not qualified to choose
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
+        (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
+        0L, 0L, 0, 0); // no space
         
     DatanodeStorageInfo[] targets;
     targets = chooseTarget(0);
@@ -348,7 +350,7 @@ public class TestReplicationPolicy {
 
     updateHeartbeatWithUsage(dataNodes[0],
         2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0); 
+        HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
   }
   
   /**
@@ -365,7 +367,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
       
     DatanodeStorageInfo[] targets;
@@ -393,7 +395,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
   }
 
@@ -455,7 +457,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+          (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
     
     final LogVerificationAppender appender = new LogVerificationAppender();
@@ -480,7 +482,7 @@ public class TestReplicationPolicy {
     for(int i=0; i<2; i++) {
       updateHeartbeatWithUsage(dataNodes[i],
           2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
-          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+          HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
     }
   }