You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/10/04 19:46:19 UTC

svn commit: r1529238 [1/2] - in /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/server/namen...

Author: wang
Date: Fri Oct  4 17:46:18 2013
New Revision: 1529238

URL: http://svn.apache.org/r1529238
Log:
HDFS-5119. Persist CacheManager state in the edit log. (Contributed by Andrew Wang)

Modified:
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathBasedCacheEntry.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
    hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-4949.txt Fri Oct  4 17:46:18 2013
@@ -48,6 +48,9 @@ HDFS-4949 (Unreleased)
     HDFS-5191. Revisit zero-copy API in FSDataInputStream to make it more
     intuitive.  (Contributed by Colin Patrick McCabe)
 
+    HDFS-5119. Persist CacheManager state in the edit log.
+    (Contributed by Andrew Wang)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CachePoolInfo.java Fri Oct  4 17:46:18 2013
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hdfs.protocol;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 
 import javax.annotation.Nullable;
@@ -27,6 +29,7 @@ import org.apache.commons.lang.builder.H
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.io.Text;
 
 /**
  * Information about a cache pool.
@@ -145,4 +148,47 @@ public class CachePoolInfo {
       throw new IOException("invalid empty cache pool name");
     }
   }
+
+  public static CachePoolInfo readFrom(DataInput in) throws IOException {
+    String poolName = Text.readString(in);
+    CachePoolInfo info = new CachePoolInfo(poolName);
+    if (in.readBoolean()) {
+      info.setOwnerName(Text.readString(in));
+    }
+    if (in.readBoolean())  {
+      info.setGroupName(Text.readString(in));
+    }
+    if (in.readBoolean()) {
+      info.setMode(FsPermission.read(in));
+    }
+    if (in.readBoolean()) {
+      info.setWeight(in.readInt());
+    }
+    return info;
+  }
+
+  public void writeTo(DataOutput out) throws IOException {
+    Text.writeString(out, poolName);
+    boolean hasOwner, hasGroup, hasMode, hasWeight;
+    hasOwner = ownerName != null;
+    hasGroup = groupName != null;
+    hasMode = mode != null;
+    hasWeight = weight != null;
+    out.writeBoolean(hasOwner);
+    if (hasOwner) {
+      Text.writeString(out, ownerName);
+    }
+    out.writeBoolean(hasGroup);
+    if (hasGroup) {
+      Text.writeString(out, groupName);
+    }
+    out.writeBoolean(hasMode);
+    if (hasMode) {
+      mode.write(out);
+    }
+    out.writeBoolean(hasWeight);
+    if (hasWeight) {
+      out.writeInt(weight);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LayoutVersion.java Fri Oct  4 17:46:18 2013
@@ -106,7 +106,8 @@ public class LayoutVersion {
     SEQUENTIAL_BLOCK_ID(-46, "Allocate block IDs sequentially and store " +
         "block IDs in the edits log and image files"),
     EDITLOG_SUPPORT_RETRYCACHE(-47, "Record ClientId and CallId in editlog to " 
-        + "enable rebuilding retry cache in case of HA failover");
+        + "enable rebuilding retry cache in case of HA failover"),
+    CACHING(-48, "Support for cache pools and path-based caching");
     
     final int lv;
     final int ancestorLV;

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathBasedCacheEntry.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathBasedCacheEntry.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathBasedCacheEntry.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/PathBasedCacheEntry.java Fri Oct  4 17:46:18 2013
@@ -65,6 +65,6 @@ public final class PathBasedCacheEntry {
   }
 
   public PathBasedCacheDescriptor getDescriptor() {
-    return new PathBasedCacheDescriptor(entryId, path, pool.getName());
+    return new PathBasedCacheDescriptor(entryId, path, pool.getPoolName());
   }
 };

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Fri Oct  4 17:46:18 2013
@@ -17,11 +17,13 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DESCRIPTORS_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -36,17 +38,24 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
 import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.InvalidPoolNameError;
-import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.UnexpectedAddPathBasedCacheDirectiveException;
 import org.apache.hadoop.hdfs.protocol.AddPathBasedCacheDirectiveException.PoolWritePermissionDeniedError;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.InvalidIdException;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.NoSuchIdException;
-import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
 import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.RemovePermissionDeniedException;
+import org.apache.hadoop.hdfs.protocol.RemovePathBasedCacheDescriptorException.UnexpectedRemovePathBasedCacheDescriptorException;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Phase;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress.Counter;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
+import org.apache.hadoop.hdfs.server.namenode.startupprogress.StepType;
+import org.apache.hadoop.io.Text;
+
+import com.google.common.base.Preconditions;
 
 /**
  * The Cache Manager handles caching on DataNodes.
@@ -94,7 +103,6 @@ public final class CacheManager {
   final private FSDirectory dir;
 
   CacheManager(FSNamesystem namesystem, FSDirectory dir, Configuration conf) {
-    // TODO: support loading and storing of the CacheManager state
     clear();
     this.namesystem = namesystem;
     this.dir = dir;
@@ -113,13 +121,20 @@ public final class CacheManager {
     nextEntryId = 1;
   }
 
-  synchronized long getNextEntryId() throws IOException {
-    if (nextEntryId == Long.MAX_VALUE) {
-      throw new IOException("no more available IDs");
-    }
+  /**
+   * Returns the next entry ID to be used for a PathBasedCacheEntry
+   */
+  synchronized long getNextEntryId() {
+    Preconditions.checkArgument(nextEntryId != Long.MAX_VALUE);
     return nextEntryId++;
   }
 
+  /**
+   * Returns the PathBasedCacheEntry corresponding to a PathBasedCacheEntry.
+   * 
+   * @param directive Lookup directive
+   * @return Corresponding PathBasedCacheEntry, or null if not present.
+   */
   private synchronized PathBasedCacheEntry
       findEntry(PathBasedCacheDirective directive) {
     List<PathBasedCacheEntry> existing =
@@ -128,13 +143,60 @@ public final class CacheManager {
       return null;
     }
     for (PathBasedCacheEntry entry : existing) {
-      if (entry.getPool().getName().equals(directive.getPool())) {
+      if (entry.getPool().getPoolName().equals(directive.getPool())) {
         return entry;
       }
     }
     return null;
   }
 
+  /**
+   * Add a new PathBasedCacheEntry, skipping any validation checks. Called
+   * directly when reloading CacheManager state from FSImage.
+   * 
+   * @throws IOException if unable to cache the entry
+   */
+  private void unprotectedAddEntry(PathBasedCacheEntry entry)
+      throws IOException {
+    assert namesystem.hasWriteLock();
+    // Add it to the various maps
+    entriesById.put(entry.getEntryId(), entry);
+    String path = entry.getPath();
+    List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
+    if (entryList == null) {
+      entryList = new ArrayList<PathBasedCacheEntry>(1);
+      entriesByPath.put(path, entryList);
+    }
+    entryList.add(entry);
+    // Set the path as cached in the namesystem
+    try {
+      INode node = dir.getINode(entry.getPath());
+      if (node != null && node.isFile()) {
+        INodeFile file = node.asFile();
+        // TODO: adjustable cache replication factor
+        namesystem.setCacheReplicationInt(entry.getPath(),
+            file.getBlockReplication());
+      } else {
+        LOG.warn("Path " + entry.getPath() + " is not a file");
+      }
+    } catch (IOException ioe) {
+      LOG.info("unprotectedAddEntry " + entry +": failed to cache file: " +
+          ioe.getClass().getName() +": " + ioe.getMessage());
+      throw ioe;
+    }
+  }
+
+  /**
+   * Add a new PathBasedCacheDirective if valid, returning a corresponding
+   * PathBasedCacheDescriptor to the user.
+   * 
+   * @param directive Directive describing the cache entry being added
+   * @param pc Permission checker used to validate that the calling user has
+   *          access to the destination cache pool
+   * @return Corresponding PathBasedCacheDescriptor for the new cache entry
+   * @throws IOException if the directive is invalid or was otherwise
+   *           unsuccessful
+   */
   public synchronized PathBasedCacheDescriptor addDirective(
       PathBasedCacheDirective directive, FSPermissionChecker pc)
       throws IOException {
@@ -162,47 +224,44 @@ public final class CacheManager {
           "existing directive " + existing + " in this pool.");
       return existing.getDescriptor();
     }
-    // Add a new entry with the next available ID.
-    PathBasedCacheEntry entry;
-    try {
-      entry = new PathBasedCacheEntry(getNextEntryId(),
-          directive.getPath(), pool);
-    } catch (IOException ioe) {
-      throw new UnexpectedAddPathBasedCacheDirectiveException(directive);
-    }
+
+    // Success!
+    PathBasedCacheDescriptor d = unprotectedAddDirective(directive);
     LOG.info("addDirective " + directive + ": added cache directive "
         + directive);
+    return d;
+  }
 
-    // Success!
-    // First, add it to the various maps
-    entriesById.put(entry.getEntryId(), entry);
-    String path = directive.getPath();
-    List<PathBasedCacheEntry> entryList = entriesByPath.get(path);
-    if (entryList == null) {
-      entryList = new ArrayList<PathBasedCacheEntry>(1);
-      entriesByPath.put(path, entryList);
-    }
-    entryList.add(entry);
+  /**
+   * Assigns a new entry ID to a validated PathBasedCacheDirective and adds
+   * it to the CacheManager. Called directly when replaying the edit log.
+   * 
+   * @param directive Directive being added
+   * @return PathBasedCacheDescriptor for the directive
+   * @throws IOException
+   */
+  PathBasedCacheDescriptor unprotectedAddDirective(
+      PathBasedCacheDirective directive) throws IOException {
+    assert namesystem.hasWriteLock();
+    CachePool pool = cachePools.get(directive.getPool());
+    // Add a new entry with the next available ID.
+    PathBasedCacheEntry entry;
+    entry = new PathBasedCacheEntry(getNextEntryId(), directive.getPath(),
+        pool);
+
+    unprotectedAddEntry(entry);
 
-    // Next, set the path as cached in the namesystem
-    try {
-      INode node = dir.getINode(directive.getPath());
-      if (node != null && node.isFile()) {
-        INodeFile file = node.asFile();
-        // TODO: adjustable cache replication factor
-        namesystem.setCacheReplicationInt(directive.getPath(),
-            file.getBlockReplication());
-      } else {
-        LOG.warn("Path " + directive.getPath() + " is not a file");
-      }
-    } catch (IOException ioe) {
-      LOG.info("addDirective " + directive +": failed to cache file: " +
-          ioe.getClass().getName() +": " + ioe.getMessage());
-      throw ioe;
-    }
     return entry.getDescriptor();
   }
 
+  /**
+   * Remove the PathBasedCacheEntry corresponding to a descriptor ID from
+   * the CacheManager.
+   * 
+   * @param id of the PathBasedCacheDescriptor
+   * @param pc Permissions checker used to validated the request
+   * @throws IOException
+   */
   public synchronized void removeDescriptor(long id, FSPermissionChecker pc)
       throws IOException {
     // Check for invalid IDs.
@@ -229,6 +288,20 @@ public final class CacheManager {
       throw new RemovePermissionDeniedException(id);
     }
     
+    unprotectedRemoveDescriptor(id);
+  }
+
+  /**
+   * Unchecked internal method used to remove a PathBasedCacheEntry from the
+   * CacheManager. Called directly when replaying the edit log.
+   * 
+   * @param id of the PathBasedCacheDescriptor corresponding to the entry that
+   *          is being removed
+   * @throws IOException
+   */
+  void unprotectedRemoveDescriptor(long id) throws IOException {
+    assert namesystem.hasWriteLock();
+    PathBasedCacheEntry existing = entriesById.get(id);
     // Remove the corresponding entry in entriesByPath.
     String path = existing.getDescriptor().getPath();
     List<PathBasedCacheEntry> entries = entriesByPath.get(path);
@@ -294,11 +367,11 @@ public final class CacheManager {
    * Create a cache pool.
    * 
    * Only the superuser should be able to call this function.
-   *
-   * @param info
-   *          The info for the cache pool to create.
+   * 
+   * @param info The info for the cache pool to create.
+   * @return the created CachePool
    */
-  public synchronized void addCachePool(CachePoolInfo info)
+  public synchronized CachePool addCachePool(CachePoolInfo info)
       throws IOException {
     CachePoolInfo.validate(info);
     String poolName = info.getPoolName();
@@ -309,8 +382,20 @@ public final class CacheManager {
     CachePool cachePool = new CachePool(poolName,
       info.getOwnerName(), info.getGroupName(), info.getMode(),
       info.getWeight());
-    cachePools.put(poolName, cachePool);
-    LOG.info("created new cache pool " + cachePool);
+    unprotectedAddCachePool(cachePool);
+    return cachePool;
+  }
+
+  /**
+   * Internal unchecked method used to add a CachePool. Called directly when
+   * reloading CacheManager state from the FSImage or edit log.
+   * 
+   * @param pool to be added
+   */
+  void unprotectedAddCachePool(CachePool pool) {
+    assert namesystem.hasWriteLock();
+    cachePools.put(pool.getPoolName(), pool);
+    LOG.info("created new cache pool " + pool);
   }
 
   /**
@@ -409,4 +494,116 @@ public final class CacheManager {
     }
     return new BatchedListEntries<CachePoolInfo>(results, false);
   }
+
+  /*
+   * FSImage related serialization and deserialization code
+   */
+
+  /**
+   * Saves the current state of the CacheManager to the DataOutput. Used
+   * to persist CacheManager state in the FSImage.
+   * @param out DataOutput to persist state
+   * @param sdPath path of the storage directory
+   * @throws IOException
+   */
+  public synchronized void saveState(DataOutput out, String sdPath)
+      throws IOException {
+    out.writeLong(nextEntryId);
+    savePools(out, sdPath);
+    saveEntries(out, sdPath);
+  }
+
+  /**
+   * Reloads CacheManager state from the passed DataInput. Used during namenode
+   * startup to restore CacheManager state from an FSImage.
+   * @param in DataInput from which to restore state
+   * @throws IOException
+   */
+  public synchronized void loadState(DataInput in) throws IOException {
+    nextEntryId = in.readLong();
+    // pools need to be loaded first since entries point to their parent pool
+    loadPools(in);
+    loadEntries(in);
+  }
+
+  /**
+   * Save cache pools to fsimage
+   */
+  private synchronized void savePools(DataOutput out,
+      String sdPath) throws IOException {
+    StartupProgress prog = NameNode.getStartupProgress();
+    Step step = new Step(StepType.CACHE_POOLS, sdPath);
+    prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+    prog.setTotal(Phase.SAVING_CHECKPOINT, step, cachePools.size());
+    Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
+    out.writeInt(cachePools.size());
+    for (CachePool pool: cachePools.values()) {
+      pool.writeTo(out);
+      counter.increment();
+    }
+    prog.endStep(Phase.SAVING_CHECKPOINT, step);
+  }
+
+  /*
+   * Save cache entries to fsimage
+   */
+  private synchronized void saveEntries(DataOutput out, String sdPath)
+      throws IOException {
+    StartupProgress prog = NameNode.getStartupProgress();
+    Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
+    prog.beginStep(Phase.SAVING_CHECKPOINT, step);
+    prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size());
+    Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
+    out.writeInt(entriesById.size());
+    for (PathBasedCacheEntry entry: entriesById.values()) {
+      out.writeLong(entry.getEntryId());
+      Text.writeString(out, entry.getPath());
+      Text.writeString(out, entry.getPool().getPoolName());
+      counter.increment();
+    }
+    prog.endStep(Phase.SAVING_CHECKPOINT, step);
+  }
+
+  /**
+   * Load cache pools from fsimage
+   */
+  private synchronized void loadPools(DataInput in)
+      throws IOException {
+    StartupProgress prog = NameNode.getStartupProgress();
+    Step step = new Step(StepType.CACHE_POOLS);
+    prog.beginStep(Phase.LOADING_FSIMAGE, step);
+    int numberOfPools = in.readInt();
+    prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfPools);
+    Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
+    for (int i = 0; i < numberOfPools; i++) {
+      CachePool pool = CachePool.readFrom(in);
+      unprotectedAddCachePool(pool);
+      counter.increment();
+    }
+    prog.endStep(Phase.LOADING_FSIMAGE, step);
+  }
+
+  /**
+   * Load cache entries from the fsimage
+   */
+  private synchronized void loadEntries(DataInput in) throws IOException {
+    StartupProgress prog = NameNode.getStartupProgress();
+    Step step = new Step(StepType.CACHE_ENTRIES);
+    prog.beginStep(Phase.LOADING_FSIMAGE, step);
+    int numberOfEntries = in.readInt();
+    prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfEntries);
+    Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
+    for (int i = 0; i < numberOfEntries; i++) {
+      long entryId = in.readLong();
+      String path = Text.readString(in);
+      String poolName = Text.readString(in);
+      // Get pool reference by looking it up in the map
+      CachePool pool = cachePools.get(poolName);
+      PathBasedCacheEntry entry = new PathBasedCacheEntry(entryId, path, pool);
+      unprotectedAddEntry(entry);
+      counter.increment();
+    }
+    prog.endStep(Phase.LOADING_FSIMAGE, step);
+  }
+
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Fri Oct  4 17:46:18 2013
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import java.io.DataInput;
+import java.io.DataOutput;
 import java.io.IOException;
 
 import javax.annotation.Nonnull;
@@ -26,8 +28,15 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.fs.permission.PermissionStatus;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.util.XMLUtils;
+import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
+import org.apache.hadoop.hdfs.util.XMLUtils.Stanza;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.xml.sax.ContentHandler;
+import org.xml.sax.SAXException;
 
 /**
  * A CachePool describes a set of cache resources being managed by the NameNode.
@@ -63,7 +72,7 @@ public final class CachePool {
   private FsPermission mode;
   
   private int weight;
-  
+
   public CachePool(String poolName, String ownerName, String groupName,
       FsPermission mode, Integer weight) throws IOException {
     this.poolName = poolName;
@@ -86,10 +95,10 @@ public final class CachePool {
     }
     this.mode = mode != null ? 
         new FsPermission(mode): FsPermission.getCachePoolDefault();
-    this.weight = weight != null ? weight : 100;
+    this.weight = weight != null ? weight : DEFAULT_WEIGHT;
   }
 
-  public String getName() {
+  public String getPoolName() {
     return poolName;
   }
 
@@ -162,4 +171,42 @@ public final class CachePool {
         append(", weight:").append(weight).
         append(" }").toString();
   }
+
+  public void writeTo(DataOutput out) throws IOException {
+    Text.writeString(out, poolName);
+    PermissionStatus perm = PermissionStatus.createImmutable(
+        ownerName, groupName, mode);
+    perm.write(out);
+    out.writeInt(weight);
+  }
+
+  public static CachePool readFrom(DataInput in) throws IOException {
+    String poolName = Text.readString(in);
+    PermissionStatus perm = PermissionStatus.read(in);
+    int weight = in.readInt();
+    return new CachePool(poolName, perm.getUserName(), perm.getGroupName(),
+        perm.getPermission(), weight);
+  }
+
+  public void writeXmlTo(ContentHandler contentHandler) throws SAXException {
+    XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
+    PermissionStatus perm = new PermissionStatus(ownerName,
+        groupName, mode);
+    FSEditLogOp.permissionStatusToXml(contentHandler, perm);
+    XMLUtils.addSaxString(contentHandler, "WEIGHT", Integer.toString(weight));
+  }
+
+  public static CachePool readXmlFrom(Stanza st) throws InvalidXmlException {
+    String poolName = st.getValue("POOLNAME");
+    PermissionStatus perm = FSEditLogOp.permissionStatusFromXml(st);
+    int weight = Integer.parseInt(st.getValue("WEIGHT"));
+    try {
+      return new CachePool(poolName, perm.getUserName(), perm.getGroupName(),
+          perm.getPermission(), weight);
+    } catch (IOException e) {
+      String error = "Invalid cache pool XML, missing fields.";
+      LOG.warn(error);
+      throw new InvalidXmlException(error);
+    }
+  }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Fri Oct  4 17:46:18 2013
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
+import static org.apache.hadoop.util.ExitUtil.terminate;
 import static org.apache.hadoop.util.Time.now;
 
 import java.io.IOException;
@@ -35,15 +36,18 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheEntry;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-
-import static org.apache.hadoop.util.ExitUtil.terminate;
-
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
 import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CloseOp;
@@ -55,12 +59,17 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.LogSegmentOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenewDelegationTokenOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetOwnerOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetPermissionsOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetQuotaOp;
@@ -69,9 +78,6 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TimesOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateBlocksOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.UpdateMasterKeyOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV1Op;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.SetGenstampV2Op;
 import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
 import org.apache.hadoop.hdfs.server.namenode.metrics.NameNodeMetrics;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@@ -948,6 +954,44 @@ public class FSEditLog implements LogsPu
     logEdit(op);
   }
   
+  void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
+      boolean toLogRpcIds) {
+    AddPathBasedCacheDirectiveOp op = AddPathBasedCacheDirectiveOp.getInstance(
+        cache.get())
+        .setPath(directive.getPath())
+        .setPool(directive.getPool());
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logRemovePathBasedCacheDescriptor(Long id, boolean toLogRpcIds) {
+    RemovePathBasedCacheDescriptorOp op =
+        RemovePathBasedCacheDescriptorOp.getInstance(cache.get()).setId(id);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logAddCachePool(CachePool pool, boolean toLogRpcIds) {
+    AddCachePoolOp op =
+        AddCachePoolOp.getInstance(cache.get()).setPool(pool);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logModifyCachePool(CachePoolInfo info, boolean toLogRpcIds) {
+    ModifyCachePoolOp op =
+        ModifyCachePoolOp.getInstance(cache.get()).setInfo(info);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
+  void logRemoveCachePool(String poolName, boolean toLogRpcIds) {
+    RemoveCachePoolOp op =
+        RemoveCachePoolOp.getInstance(cache.get()).setPoolName(poolName);
+    logRpcIds(op, toLogRpcIds);
+    logEdit(op);
+  }
+
   /**
    * Get all the journals this edit log is currently operating on.
    */

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Fri Oct  4 17:46:18 2013
@@ -36,10 +36,14 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.common.Storage;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
@@ -52,7 +56,10 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.DisallowSnapshotOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDescriptorOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
 import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -76,6 +83,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.startupprogress.Step;
 import org.apache.hadoop.hdfs.util.ChunkedArrayList;
 import org.apache.hadoop.hdfs.util.Holder;
+import org.apache.jasper.tagplugins.jstl.core.Remove;
 
 import com.google.common.base.Joiner;
 
@@ -631,6 +639,56 @@ public class FSEditLogLoader {
       fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
       break;
     }
+    case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
+      AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
+      PathBasedCacheDirective d = new PathBasedCacheDirective(addOp.path,
+          addOp.pool);
+      PathBasedCacheDescriptor descriptor =
+          fsNamesys.getCacheManager().unprotectedAddDirective(d);
+
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId,
+            descriptor);
+      }
+      break;
+    }
+    case OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR: {
+      RemovePathBasedCacheDescriptorOp removeOp =
+          (RemovePathBasedCacheDescriptorOp) op;
+      fsNamesys.getCacheManager().unprotectedRemoveDescriptor(removeOp.id);
+
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
+    case OP_ADD_CACHE_POOL: {
+      AddCachePoolOp addOp = (AddCachePoolOp) op;
+      fsNamesys.getCacheManager().unprotectedAddCachePool(addOp.pool);
+
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
+    case OP_MODIFY_CACHE_POOL: {
+      ModifyCachePoolOp modifyOp = (ModifyCachePoolOp) op;
+      fsNamesys.getCacheManager().modifyCachePool(modifyOp.info);
+
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
+    case OP_REMOVE_CACHE_POOL: {
+      RemoveCachePoolOp removeOp = (RemoveCachePoolOp) op;
+      fsNamesys.getCacheManager().removeCachePool(removeOp.poolName);
+
+      if (toAddRetryCache) {
+        fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
+      }
+      break;
+    }
     default:
       throw new IOException("Invalid operation read " + op.opCode);
     }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Fri Oct  4 17:46:18 2013
@@ -18,6 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@@ -32,7 +34,10 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -56,6 +61,7 @@ import java.io.DataOutput;
 import java.io.DataOutputStream;
 import java.io.EOFException;
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.EnumMap;
 import java.util.List;
@@ -73,6 +79,7 @@ import org.apache.hadoop.fs.permission.P
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -97,7 +104,9 @@ import org.xml.sax.ContentHandler;
 import org.xml.sax.SAXException;
 import org.xml.sax.helpers.AttributesImpl;
 
+import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
 
 /**
  * Helper classes for reading the ops from an InputStream.
@@ -153,6 +162,13 @@ public abstract class FSEditLogOp {
       inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
       inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
       inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
+      inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
+          new AddPathBasedCacheDirectiveOp());
+      inst.put(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR,
+          new RemovePathBasedCacheDescriptorOp());
+      inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
+      inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
+      inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
     }
     
     public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -528,8 +544,7 @@ public abstract class FSEditLogOp {
       } else {
         this.blocks = new Block[0];
       }
-      this.permissions =
-          permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      this.permissions = permissionStatusFromXml(st);
       readRpcIdsFromXml(st);
     }
   }
@@ -1208,8 +1223,7 @@ public abstract class FSEditLogOp {
       this.inodeId = Long.valueOf(st.getValue("INODEID"));
       this.path = st.getValue("PATH");
       this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
-      this.permissions =
-          permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      this.permissions = permissionStatusFromXml(st);
     }
   }
 
@@ -1940,8 +1954,7 @@ public abstract class FSEditLogOp {
       this.value = st.getValue("VALUE");
       this.mtime = Long.valueOf(st.getValue("MTIME"));
       this.atime = Long.valueOf(st.getValue("ATIME"));
-      this.permissionStatus =
-          permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+      this.permissionStatus = permissionStatusFromXml(st);
       
       readRpcIdsFromXml(st);
     }
@@ -2848,6 +2861,266 @@ public abstract class FSEditLogOp {
     }
   }
 
+  static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
+
+    String path;
+    String pool;
+
+    public AddPathBasedCacheDirectiveOp() {
+      super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+    }
+
+    static AddPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
+      return (AddPathBasedCacheDirectiveOp) cache
+          .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+    }
+
+    public AddPathBasedCacheDirectiveOp setPath(String path) {
+      this.path = path;
+      return this;
+    }
+
+    public AddPathBasedCacheDirectiveOp setPool(String pool) {
+      this.pool = pool;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      this.path = FSImageSerialization.readString(in);
+      this.pool = FSImageSerialization.readString(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(path, out);
+      FSImageSerialization.writeString(pool, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "PATH", path);
+      XMLUtils.addSaxString(contentHandler, "POOL", pool);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      path = st.getValue("PATH");
+      pool = st.getValue("POOL");
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("AddPathBasedCacheDirective [");
+      builder.append("path=" + path + ",");
+      builder.append("pool=" + pool + "]");
+      return builder.toString();
+    }
+  }
+
+  static class RemovePathBasedCacheDescriptorOp extends FSEditLogOp {
+    long id;
+
+    public RemovePathBasedCacheDescriptorOp() {
+      super(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
+    }
+
+    static RemovePathBasedCacheDescriptorOp getInstance(OpInstanceCache cache) {
+      return (RemovePathBasedCacheDescriptorOp) cache
+          .get(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
+    }
+
+    public RemovePathBasedCacheDescriptorOp setId(long id) {
+      this.id = id;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      this.id = FSImageSerialization.readLong(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeLong(id, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "ID", Long.toString(id));
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.id = Long.parseLong(st.getValue("ID"));
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("RemovePathBasedCacheDescriptor [");
+      builder.append("id=" + Long.toString(id) + "]");
+      return builder.toString();
+    }
+  }
+
+  static class AddCachePoolOp extends FSEditLogOp {
+    CachePool pool;
+
+    public AddCachePoolOp() {
+      super(OP_ADD_CACHE_POOL);
+    }
+
+    static AddCachePoolOp getInstance(OpInstanceCache cache) {
+      return (AddCachePoolOp) cache.get(OP_ADD_CACHE_POOL);
+    }
+
+    public AddCachePoolOp setPool(CachePool pool) {
+      this.pool = pool;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      pool = CachePool.readFrom(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      pool.writeTo(out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      pool.writeXmlTo(contentHandler);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.pool = CachePool.readXmlFrom(st);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("AddCachePoolOp [");
+      builder.append("poolName=" + pool.getPoolName() + ",");
+      builder.append("ownerName=" + pool.getOwnerName() + ",");
+      builder.append("groupName=" + pool.getGroupName() + ",");
+      builder.append("mode=" + Short.toString(pool.getMode().toShort()) + ",");
+      builder.append("weight=" + Integer.toString(pool.getWeight()) + "]");
+      return builder.toString();
+    }
+  }
+
+  static class ModifyCachePoolOp extends FSEditLogOp {
+    CachePoolInfo info;
+
+    public ModifyCachePoolOp() {
+      super(OP_MODIFY_CACHE_POOL);
+    }
+
+    static ModifyCachePoolOp getInstance(OpInstanceCache cache) {
+      return (ModifyCachePoolOp) cache.get(OP_MODIFY_CACHE_POOL);
+    }
+
+    public ModifyCachePoolOp setInfo(CachePoolInfo info) {
+      this.info = info;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      info = CachePoolInfo.readFrom(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      info.writeTo(out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      cachePoolInfoToXml(contentHandler, info);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.info = cachePoolInfoFromXml(st);
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("ModifyCachePoolOp [");
+      ArrayList<String> fields = new ArrayList<String>(5);
+      if (info.getPoolName() != null) {
+        fields.add("poolName=" + info.getPoolName());
+      }
+      if (info.getOwnerName() != null) {
+        fields.add("ownerName=" + info.getOwnerName());
+      }
+      if (info.getGroupName() != null) {
+        fields.add("groupName=" + info.getGroupName());
+      }
+      if (info.getMode() != null) {
+        fields.add("mode=" + info.getMode().toString());
+      }
+      if (info.getWeight() != null) {
+        fields.add("weight=" + info.getWeight());
+      }
+      builder.append(Joiner.on(",").join(fields));
+      builder.append("]");
+      return builder.toString();
+    }
+  }
+
+  static class RemoveCachePoolOp extends FSEditLogOp {
+    String poolName;
+
+    public RemoveCachePoolOp() {
+      super(OP_REMOVE_CACHE_POOL);
+    }
+
+    static RemoveCachePoolOp getInstance(OpInstanceCache cache) {
+      return (RemoveCachePoolOp) cache.get(OP_REMOVE_CACHE_POOL);
+    }
+
+    public RemoveCachePoolOp setPoolName(String poolName) {
+      this.poolName = poolName;
+      return this;
+    }
+
+    @Override
+    void readFields(DataInputStream in, int logVersion) throws IOException {
+      poolName = FSImageSerialization.readString(in);
+    }
+
+    @Override
+    public void writeFields(DataOutputStream out) throws IOException {
+      FSImageSerialization.writeString(poolName, out);
+    }
+
+    @Override
+    protected void toXml(ContentHandler contentHandler) throws SAXException {
+      XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
+    }
+
+    @Override
+    void fromXml(Stanza st) throws InvalidXmlException {
+      this.poolName = st.getValue("POOLNAME");
+    }
+
+    @Override
+    public String toString() {
+      StringBuilder builder = new StringBuilder();
+      builder.append("RemoveCachePoolOp [");
+      builder.append("poolName=" + poolName + "]");
+      return builder.toString();
+    }
+  }
+
   static private short readShort(DataInputStream in) throws IOException {
     return Short.parseShort(FSImageSerialization.readString(in));
   }
@@ -3235,16 +3508,65 @@ public abstract class FSEditLogOp {
     contentHandler.startElement("", "", "PERMISSION_STATUS", new AttributesImpl());
     XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
     XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
-    XMLUtils.addSaxString(contentHandler, "MODE",
-        Short.valueOf(perm.getPermission().toShort()).toString());
+    fsPermissionToXml(contentHandler, perm.getPermission());
     contentHandler.endElement("", "", "PERMISSION_STATUS");
   }
 
   public static PermissionStatus permissionStatusFromXml(Stanza st)
       throws InvalidXmlException {
-    String username = st.getValue("USERNAME");
-    String groupname = st.getValue("GROUPNAME");
+    Stanza status = st.getChildren("PERMISSION_STATUS").get(0);
+    String username = status.getValue("USERNAME");
+    String groupname = status.getValue("GROUPNAME");
+    FsPermission mode = fsPermissionFromXml(status);
+    return new PermissionStatus(username, groupname, mode);
+  }
+
+  public static void fsPermissionToXml(ContentHandler contentHandler,
+      FsPermission mode) throws SAXException {
+    XMLUtils.addSaxString(contentHandler, "MODE", Short.valueOf(mode.toShort())
+        .toString());
+  }
+
+  public static FsPermission fsPermissionFromXml(Stanza st)
+      throws InvalidXmlException {
     short mode = Short.valueOf(st.getValue("MODE"));
-    return new PermissionStatus(username, groupname, new FsPermission(mode));
+    return new FsPermission(mode);
+  }
+
+  public static void cachePoolInfoToXml(ContentHandler contentHandler,
+      CachePoolInfo info) throws SAXException {
+    XMLUtils.addSaxString(contentHandler, "POOLNAME", info.getPoolName());
+    if (info.getOwnerName() != null) {
+      XMLUtils.addSaxString(contentHandler, "OWNERNAME", info.getOwnerName());
+    }
+    if (info.getGroupName() != null) {
+      XMLUtils.addSaxString(contentHandler, "GROUPNAME", info.getGroupName());
+    }
+    if (info.getMode() != null) {
+      fsPermissionToXml(contentHandler, info.getMode());
+    }
+    if (info.getWeight() != null) {
+      XMLUtils.addSaxString(contentHandler, "WEIGHT",
+          Integer.toString(info.getWeight()));
+    }
+  }
+
+  public static CachePoolInfo cachePoolInfoFromXml(Stanza st)
+      throws InvalidXmlException {
+    String poolName = st.getValue("POOLNAME");
+    CachePoolInfo info = new CachePoolInfo(poolName);
+    if (st.hasChildren("OWNERNAME")) {
+      info.setOwnerName(st.getValue("OWNERNAME"));
+    }
+    if (st.hasChildren("GROUPNAME")) {
+      info.setGroupName(st.getValue("GROUPNAME"));
+    }
+    if (st.hasChildren("MODE")) {
+      info.setMode(FSEditLogOp.fsPermissionFromXml(st));
+    }
+    if (st.hasChildren("WEIGHT")) {
+      info.setWeight(Integer.parseInt(st.getValue("WEIGHT")));
+    }
+    return info;
   }
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Fri Oct  4 17:46:18 2013
@@ -63,7 +63,13 @@ public enum FSEditLogOpCodes {
   OP_ALLOW_SNAPSHOT             ((byte) 29),
   OP_DISALLOW_SNAPSHOT          ((byte) 30),
   OP_SET_GENSTAMP_V2            ((byte) 31),
-  OP_ALLOCATE_BLOCK_ID          ((byte) 32);
+  OP_ALLOCATE_BLOCK_ID          ((byte) 32),
+  OP_ADD_PATH_BASED_CACHE_DIRECTIVE        ((byte) 33),
+  OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR    ((byte) 34),
+  OP_ADD_CACHE_POOL             ((byte) 35),
+  OP_MODIFY_CACHE_POOL          ((byte) 36),
+  OP_REMOVE_CACHE_POOL          ((byte) 37);
+
   private byte opCode;
 
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Fri Oct  4 17:46:18 2013
@@ -351,6 +351,8 @@ public class FSImageFormat {
 
         loadSecretManagerState(in);
 
+        loadCacheManagerState(in);
+
         // make sure to read to the end of file
         boolean eof = (in.read() == -1);
         assert eof : "Should have reached the end of image file " + curFile;
@@ -843,6 +845,14 @@ public class FSImageFormat {
       namesystem.loadSecretManagerState(in);
     }
 
+    private void loadCacheManagerState(DataInput in) throws IOException {
+      int imgVersion = getLayoutVersion();
+      if (!LayoutVersion.supports(Feature.CACHING, imgVersion)) {
+        return;
+      }
+      namesystem.getCacheManager().loadState(in);
+    }
+
     private int getLayoutVersion() {
       return namesystem.getFSImage().getStorage().getLayoutVersion();
     }
@@ -985,6 +995,8 @@ public class FSImageFormat {
         context.checkCancelled();
         sourceNamesystem.saveSecretManagerState(out, sdPath);
         context.checkCancelled();
+        sourceNamesystem.getCacheManager().saveState(out, sdPath);
+        context.checkCancelled();
         out.flush();
         context.checkCancelled();
         fout.getChannel().force(true);

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Oct  4 17:46:18 2013
@@ -227,7 +227,6 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenIdentifier;
 import org.apache.hadoop.security.token.delegation.DelegationKey;
 import org.apache.hadoop.util.Daemon;
 import org.apache.hadoop.util.DataChecksum;
@@ -1956,7 +1955,7 @@ public class FSNamesystem implements Nam
 
     getEditLog().logSync();
     if (isFile) {
-      logAuditEvent(true, "setReplication", src);
+      logAuditEvent(true, "setCacheReplication", src);
     }
     return isFile;
   }
@@ -6884,10 +6883,10 @@ public class FSNamesystem implements Nam
 
   PathBasedCacheDescriptor addPathBasedCacheDirective(
       PathBasedCacheDirective directive) throws IOException {
-    CacheEntryWithPayload retryCacheEntry =
+    CacheEntryWithPayload cacheEntry =
         RetryCache.waitForCompletion(retryCache, null);
-    if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
-      return (PathBasedCacheDescriptor) retryCacheEntry.getPayload();
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
+      return (PathBasedCacheDescriptor) cacheEntry.getPayload();
     }
     final FSPermissionChecker pc = isPermissionEnabled ?
         getPermissionChecker() : null;
@@ -6902,7 +6901,8 @@ public class FSNamesystem implements Nam
             "Cannot add PathBasedCache directive", safeMode);
       }
       result = cacheManager.addDirective(directive, pc);
-      //getEditLog().logAddPathBasedCacheDirective(result); FIXME: HDFS-5119
+      getEditLog().logAddPathBasedCacheDirective(directive,
+          cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
@@ -6912,14 +6912,14 @@ public class FSNamesystem implements Nam
       if (isAuditEnabled() && isExternalInvocation()) {
         logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
       }
-      RetryCache.setState(retryCacheEntry, success, result);
+      RetryCache.setState(cacheEntry, success, result);
     }
     return result;
   }
 
   void removePathBasedCacheDescriptor(Long id) throws IOException {
-    CacheEntry retryCacheEntry = RetryCache.waitForCompletion(retryCache);
-    if (retryCacheEntry != null && retryCacheEntry.isSuccess()) {
+    CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+    if (cacheEntry != null && cacheEntry.isSuccess()) {
       return;
     }
     final FSPermissionChecker pc = isPermissionEnabled ?
@@ -6934,7 +6934,7 @@ public class FSNamesystem implements Nam
             "Cannot remove PathBasedCache directives", safeMode);
       }
       cacheManager.removeDescriptor(id, pc);
-      //getEditLog().logRemovePathBasedCacheEntries(results); FIXME: HDFS-5119
+      getEditLog().logRemovePathBasedCacheDescriptor(id, cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
@@ -6942,7 +6942,7 @@ public class FSNamesystem implements Nam
         logAuditEvent(success, "removePathBasedCacheDescriptors", null, null,
             null);
       }
-      RetryCache.setState(retryCacheEntry, success);
+      RetryCache.setState(cacheEntry, success);
     }
     getEditLog().logSync();
   }
@@ -6989,8 +6989,8 @@ public class FSNamesystem implements Nam
       if (pc != null) {
         pc.checkSuperuserPrivilege();
       }
-      cacheManager.addCachePool(req);
-      //getEditLog().logAddCachePool(req); // FIXME: HDFS-5119
+      CachePool pool = cacheManager.addCachePool(req);
+      getEditLog().logAddCachePool(pool, cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
@@ -7023,7 +7023,7 @@ public class FSNamesystem implements Nam
         pc.checkSuperuserPrivilege();
       }
       cacheManager.modifyCachePool(req);
-      //getEditLog().logModifyCachePool(req); // FIXME: HDFS-5119
+      getEditLog().logModifyCachePool(req, cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();
@@ -7056,7 +7056,7 @@ public class FSNamesystem implements Nam
         pc.checkSuperuserPrivilege();
       }
       cacheManager.removeCachePool(cachePoolName);
-      //getEditLog().logRemoveCachePool(req); // FIXME: HDFS-5119
+      getEditLog().logRemoveCachePool(cachePoolName, cacheEntry != null);
       success = true;
     } finally {
       writeUnlock();

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java Fri Oct  4 17:46:18 2013
@@ -42,7 +42,17 @@ public enum StepType {
   /**
    * The namenode is performing an operation related to inodes.
    */
-  INODES("Inodes", "inodes");
+  INODES("Inodes", "inodes"),
+
+  /**
+   * The namenode is performing an operation related to cache pools.
+   */
+  CACHE_POOLS("CachePools", "cache pools"),
+
+  /**
+   * The namenode is performing an operation related to cache entries.
+   */
+  CACHE_ENTRIES("CacheEntries", "cache entries");
 
   private final String name, description;
 

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Fri Oct  4 17:46:18 2013
@@ -126,7 +126,7 @@ class ImageLoaderCurrent implements Imag
                                       new SimpleDateFormat("yyyy-MM-dd HH:mm");
   private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
       -24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
-      -40, -41, -42, -43, -44, -45, -46, -47 };
+      -40, -41, -42, -43, -44, -45, -46, -47, -48 };
   private int imageVersion = 0;
   
   private final Map<Long, String> subtreeMap = new HashMap<Long, String>();
@@ -216,6 +216,9 @@ class ImageLoaderCurrent implements Imag
         processDelegationTokens(in, v);
       }
       
+      if (LayoutVersion.supports(Feature.CACHING, imageVersion)) {
+        processCacheManagerState(in, v);
+      }
       v.leaveEnclosingElement(); // FSImage
       done = true;
     } finally {
@@ -228,6 +231,24 @@ class ImageLoaderCurrent implements Imag
   }
 
   /**
+   * Process CacheManager state from the fsimage.
+   */
+  private void processCacheManagerState(DataInputStream in, ImageVisitor v)
+      throws IOException {
+    v.visit(ImageElement.CACHE_NEXT_ENTRY_ID, in.readLong());
+    final int numPools = in.readInt();
+    for (int i=0; i<numPools; i++) {
+      v.visit(ImageElement.CACHE_POOL_NAME, Text.readString(in));
+      processCachePoolPermission(in, v);
+      v.visit(ImageElement.CACHE_POOL_WEIGHT, in.readInt());
+    }
+    final int numEntries = in.readInt();
+    for (int i=0; i<numEntries; i++) {
+      v.visit(ImageElement.CACHE_ENTRY_PATH, Text.readString(in));
+      v.visit(ImageElement.CACHE_ENTRY_POOL_NAME, Text.readString(in));
+    }
+  }
+  /**
    * Process the Delegation Token related section in fsimage.
    * 
    * @param in DataInputStream to process
@@ -385,6 +406,22 @@ class ImageLoaderCurrent implements Imag
   }
 
   /**
+   * Extract CachePool permissions stored in the fsimage file.
+   *
+   * @param in Datastream to process
+   * @param v Visitor to walk over inodes
+   */
+  private void processCachePoolPermission(DataInputStream in, ImageVisitor v)
+      throws IOException {
+    v.visitEnclosingElement(ImageElement.PERMISSIONS);
+    v.visit(ImageElement.CACHE_POOL_OWNER_NAME, Text.readString(in));
+    v.visit(ImageElement.CACHE_POOL_GROUP_NAME, Text.readString(in));
+    FsPermission fsp = new FsPermission(in.readShort());
+    v.visit(ImageElement.CACHE_POOL_PERMISSION_STRING, fsp.toString());
+    v.leaveEnclosingElement(); // Permissions
+  }
+
+  /**
    * Process the INode records stored in the fsimage.
    *
    * @param in Datastream to process

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java Fri Oct  4 17:46:18 2013
@@ -117,7 +117,18 @@ abstract class ImageVisitor {
     SNAPSHOT_DST_SNAPSHOT_ID,
     SNAPSHOT_LAST_SNAPSHOT_ID,
     SNAPSHOT_REF_INODE_ID,
-    SNAPSHOT_REF_INODE
+    SNAPSHOT_REF_INODE,
+
+    CACHE_NEXT_ENTRY_ID,
+    CACHE_NUM_POOLS,
+    CACHE_POOL_NAME,
+    CACHE_POOL_OWNER_NAME,
+    CACHE_POOL_GROUP_NAME,
+    CACHE_POOL_PERMISSION_STRING,
+    CACHE_POOL_WEIGHT,
+    CACHE_NUM_ENTRIES,
+    CACHE_ENTRY_PATH,
+    CACHE_ENTRY_POOL_NAME
   }
   
   /**

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestCacheReplicationManager.java Fri Oct  4 17:46:18 2013
@@ -23,16 +23,19 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assume.assumeTrue;
 
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileSystemTestHelper;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
@@ -55,7 +58,7 @@ public class TestCacheReplicationManager
 
   private static Configuration conf;
   private static MiniDFSCluster cluster = null;
-  private static FileSystem fs;
+  private static DistributedFileSystem dfs;
   private static NameNode nn;
   private static NamenodeProtocols nnRpc;
   private static CacheReplicationManager cacheReplManager;
@@ -79,17 +82,17 @@ public class TestCacheReplicationManager
         .numDataNodes(NUM_DATANODES).build();
     cluster.waitActive();
 
-    fs = cluster.getFileSystem();
+    dfs = cluster.getFileSystem();
     nn = cluster.getNameNode();
     nnRpc = nn.getRpcServer();
     cacheReplManager = nn.getNamesystem().getCacheReplicationManager();
-    rootDir = helper.getDefaultWorkingDirectory(fs);
+    rootDir = helper.getDefaultWorkingDirectory(dfs);
   }
 
   @After
   public void tearDown() throws Exception {
-    if (fs != null) {
-      fs.close();
+    if (dfs != null) {
+      dfs.close();
     }
     if (cluster != null) {
       cluster.shutdown();
@@ -139,7 +142,7 @@ public class TestCacheReplicationManager
     final List<String> paths = new ArrayList<String>(numFiles);
     for (int i=0; i<numFiles; i++) {
       Path p = new Path(rootDir, "testCachePaths-" + i);
-      FileSystemTestHelper.createFile(fs, p, numBlocksPerFile, (int)BLOCK_SIZE);
+      FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile, (int)BLOCK_SIZE);
       paths.add(p.toUri().getPath());
     }
     // Check the initial statistics at the namenode
@@ -152,9 +155,9 @@ public class TestCacheReplicationManager
       PathBasedCacheDescriptor descriptor =
           nnRpc.addPathBasedCacheDirective(directive);
       assertEquals("Descriptor does not match requested path", paths.get(i),
-          directive.getPath());
+          descriptor.getPath());
       assertEquals("Descriptor does not match requested pool", pool,
-          directive.getPool());
+          descriptor.getPool());
       expected += numBlocksPerFile;
       waitForExpectedNumCachedBlocks(expected);
     }
@@ -168,4 +171,68 @@ public class TestCacheReplicationManager
       waitForExpectedNumCachedBlocks(expected);
     }
   }
+
+  @Test(timeout=60000)
+  public void testCacheManagerRestart() throws Exception {
+    // Create and validate a pool
+    final String pool = "poolparty";
+    String groupName = "partygroup";
+    FsPermission mode = new FsPermission((short)0777);
+    int weight = 747;
+    dfs.addCachePool(new CachePoolInfo(pool)
+        .setGroupName(groupName)
+        .setMode(mode)
+        .setWeight(weight));
+    RemoteIterator<CachePoolInfo> pit = dfs.listCachePools();
+    assertTrue("No cache pools found", pit.hasNext());
+    CachePoolInfo info = pit.next();
+    assertEquals(pool, info.getPoolName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(weight, (int)info.getWeight());
+    assertFalse("Unexpected # of cache pools found", pit.hasNext());
+
+    // Create some cache entries
+    int numEntries = 10;
+    String entryPrefix = "/party-";
+    for (int i=0; i<numEntries; i++) {
+      dfs.addPathBasedCacheDirective(new PathBasedCacheDirective(entryPrefix + i,
+          pool));
+    }
+    RemoteIterator<PathBasedCacheDescriptor> dit
+        = dfs.listPathBasedCacheDescriptors(null, null);
+    for (int i=0; i<numEntries; i++) {
+      assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+      PathBasedCacheDescriptor cd = dit.next();
+      assertEquals(i+1, cd.getEntryId());
+      assertEquals(entryPrefix + i, cd.getPath());
+      assertEquals(pool, cd.getPool());
+    }
+    assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
+
+    // Restart namenode
+    cluster.restartNameNode();
+
+    // Check that state came back up
+    pit = dfs.listCachePools();
+    assertTrue("No cache pools found", pit.hasNext());
+    info = pit.next();
+    assertEquals(pool, info.getPoolName());
+    assertEquals(pool, info.getPoolName());
+    assertEquals(groupName, info.getGroupName());
+    assertEquals(mode, info.getMode());
+    assertEquals(weight, (int)info.getWeight());
+    assertFalse("Unexpected # of cache pools found", pit.hasNext());
+
+    dit = dfs.listPathBasedCacheDescriptors(null, null);
+    for (int i=0; i<numEntries; i++) {
+      assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+      PathBasedCacheDescriptor cd = dit.next();
+      assertEquals(i+1, cd.getEntryId());
+      assertEquals(entryPrefix + i, cd.getPath());
+      assertEquals(pool, cd.getPool());
+    }
+    assertFalse("Unexpected # of cache descriptors found", dit.hasNext());
+  }
+
 }

Modified: hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1529238&r1=1529237&r2=1529238&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Fri Oct  4 17:46:18 2013
@@ -39,8 +39,11 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
 import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
 import org.apache.hadoop.hdfs.server.common.Util;
 import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -229,6 +232,22 @@ public class OfflineEditsViewerHelper {
     // OP_UPDATE_MASTER_KEY 21
     //   done by getDelegationTokenSecretManager().startThreads();
 
+    // OP_ADD_CACHE_POOL 35
+    final String pool = "poolparty";
+    dfs.addCachePool(new CachePoolInfo(pool));
+    // OP_MODIFY_CACHE_POOL 36
+    dfs.modifyCachePool(new CachePoolInfo(pool)
+        .setOwnerName("carlton")
+        .setGroupName("party")
+        .setMode(new FsPermission((short)0700))
+        .setWeight(1989));
+    // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
+    PathBasedCacheDescriptor descriptor =
+        dfs.addPathBasedCacheDirective(new PathBasedCacheDirective("/bar", pool));
+    // OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34
+    dfs.removePathBasedCacheDescriptor(descriptor);
+    // OP_REMOVE_CACHE_POOL 37
+    dfs.removeCachePool(pool);
     // sync to disk, otherwise we parse partial edits
     cluster.getNameNode().getFSImage().getEditLog().logSync();