You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/12/02 18:41:48 UTC

svn commit: r1547122 [2/5] - in /hadoop/common/branches/HDFS-2832/hadoop-hdfs-project: hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/ hadoop-hdfs/ hadoop-hdfs/dev-support/ h...

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java Mon Dec  2 17:41:44 2013
@@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
 import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.util.StringUtils;
 
@@ -87,6 +88,7 @@ public class QuorumJournalManager implem
   private final AsyncLoggerSet loggers;
 
   private int outputBufferCapacity = 512 * 1024;
+  private final URLConnectionFactory connectionFactory;
   
   public QuorumJournalManager(Configuration conf,
       URI uri, NamespaceInfo nsInfo) throws IOException {
@@ -102,6 +104,8 @@ public class QuorumJournalManager implem
     this.uri = uri;
     this.nsInfo = nsInfo;
     this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
+    this.connectionFactory = URLConnectionFactory
+        .newDefaultURLConnectionFactory(conf);
 
     // Configure timeouts.
     this.startSegmentTimeoutMs = conf.getInt(
@@ -475,8 +479,8 @@ public class QuorumJournalManager implem
         URL url = logger.buildURLToFetchLogs(remoteLog.getStartTxId());
 
         EditLogInputStream elis = EditLogFileInputStream.fromUrl(
-            url, remoteLog.getStartTxId(), remoteLog.getEndTxId(),
-            remoteLog.isInProgress());
+            connectionFactory, url, remoteLog.getStartTxId(),
+            remoteLog.getEndTxId(), remoteLog.isInProgress());
         allStreams.add(elis);
       }
     }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumOutputStream.java Mon Dec  2 17:41:44 2013
@@ -114,10 +114,10 @@ class QuorumOutputStream extends EditLog
   }
 
   @Override
-  public String generateHtmlReport() {
+  public String generateReport() {
     StringBuilder sb = new StringBuilder();
-    sb.append("Writing segment beginning at txid " + segmentTxId + "<br/>\n");
-    loggers.appendHtmlReport(sb);
+    sb.append("Writing segment beginning at txid " + segmentTxId + ". \n");
+    loggers.appendReport(sb);
     return sb.toString();
   }
   

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeHttpServer.java Mon Dec  2 17:41:44 2013
@@ -23,6 +23,8 @@ import static org.apache.hadoop.hdfs.DFS
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
 
 import javax.servlet.ServletContext;
 
@@ -69,8 +71,15 @@ public class JournalNodeHttpServer {
         bindAddr.getHostName()));
 
     int tmpInfoPort = bindAddr.getPort();
+    URI httpEndpoint;
+    try {
+      httpEndpoint = new URI("http://" + NetUtils.getHostPortString(bindAddr));
+    } catch (URISyntaxException e) {
+      throw new IOException(e);
+    }
+
     httpServer = new HttpServer.Builder().setName("journal")
-        .setBindAddress(bindAddr.getHostName()).setPort(tmpInfoPort)
+        .addEndpoint(httpEndpoint)
         .setFindPort(tmpInfoPort == 0).setConf(conf).setACL(
             new AccessControlList(conf.get(DFS_ADMIN, " ")))
         .setSecurityEnabled(UserGroupInformation.isSecurityEnabled())
@@ -85,7 +94,7 @@ public class JournalNodeHttpServer {
     httpServer.start();
 
     // The web-server port can be ephemeral... ensure we have the correct info
-    infoPort = httpServer.getPort();
+    infoPort = httpServer.getConnectorAddress(0).getPort();
 
     LOG.info("Journal Web-server up at: " + bindAddr + ":" + infoPort);
   }
@@ -104,7 +113,7 @@ public class JournalNodeHttpServer {
    * Return the actual address bound to by the running server.
    */
   public InetSocketAddress getAddress() {
-    InetSocketAddress addr = httpServer.getListenerAddress();
+    InetSocketAddress addr = httpServer.getConnectorAddress(0);
     assert addr.getPort() != 0;
     return addr;
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Mon Dec  2 17:41:44 2013
@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Exi
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -208,17 +209,27 @@ public class CacheReplicationMonitor ext
   /**
    * Scan all CacheDirectives.  Use the information to figure out
    * what cache replication factor each block should have.
-   *
-   * @param mark       Whether the current scan is setting or clearing the mark
    */
   private void rescanCacheDirectives() {
     FSDirectory fsDir = namesystem.getFSDirectory();
-    for (CacheDirective pce : cacheManager.getEntriesById().values()) {
+    final long now = new Date().getTime();
+    for (CacheDirective directive : cacheManager.getEntriesById().values()) {
+      // Reset the directive
+      directive.clearBytesNeeded();
+      directive.clearBytesCached();
+      directive.clearFilesAffected();
+      // Skip processing this entry if it has expired
+      LOG.info("Directive expiry is at " + directive.getExpiryTime());
+      if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping directive id " + directive.getId()
+              + " because it has expired (" + directive.getExpiryTime() + ">="
+              + now);
+        }
+        continue;
+      }
       scannedDirectives++;
-      pce.clearBytesNeeded();
-      pce.clearBytesCached();
-      pce.clearFilesAffected();
-      String path = pce.getPath();
+      String path = directive.getPath();
       INode node;
       try {
         node = fsDir.getINode(path);
@@ -235,11 +246,11 @@ public class CacheReplicationMonitor ext
         ReadOnlyList<INode> children = dir.getChildrenList(null);
         for (INode child : children) {
           if (child.isFile()) {
-            rescanFile(pce, child.asFile());
+            rescanFile(directive, child.asFile());
           }
         }
       } else if (node.isFile()) {
-        rescanFile(pce, node.asFile());
+        rescanFile(directive, node.asFile());
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Ignoring non-directory, non-file inode " + node +
@@ -301,7 +312,7 @@ public class CacheReplicationMonitor ext
     pce.addBytesNeeded(neededTotal);
     pce.addBytesCached(cachedTotal);
     if (LOG.isTraceEnabled()) {
-      LOG.debug("Directive " + pce.getEntryId() + " is caching " +
+      LOG.debug("Directive " + pce.getId() + " is caching " +
           file.getFullPathName() + ": " + cachedTotal + "/" + neededTotal);
     }
   }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeStatistics.java Mon Dec  2 17:41:44 2013
@@ -42,6 +42,12 @@ public interface DatanodeStatistics {
 
   /** @return the percentage of the block pool used space over the total capacity. */
   public float getPercentBlockPoolUsed();
+  
+  /** @return the total cache capacity of all DataNodes */
+  public long getCacheCapacity();
+
+  /** @return the total cache used by all DataNodes */
+  public long getCacheUsed();
 
   /** @return the xceiver count */
   public int getXceiverCount();

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/HeartbeatManager.java Mon Dec  2 17:41:44 2013
@@ -149,6 +149,17 @@ class HeartbeatManager implements Datano
   public synchronized int getXceiverCount() {
     return stats.xceiverCount;
   }
+  
+  @Override
+  public synchronized long getCacheCapacity() {
+    return stats.cacheCapacity;
+  }
+
+  @Override
+  public synchronized long getCacheUsed() {
+    return stats.cacheUsed;
+  }
+  
 
   @Override
   public synchronized long[] getStats() {
@@ -309,6 +320,8 @@ class HeartbeatManager implements Datano
     private long capacityRemaining = 0L;
     private long blockPoolUsed = 0L;
     private int xceiverCount = 0;
+    private long cacheCapacity = 0L;
+    private long cacheUsed = 0L;
 
     private int expiredHeartbeats = 0;
 
@@ -322,6 +335,8 @@ class HeartbeatManager implements Datano
       } else {
         capacityTotal += node.getDfsUsed();
       }
+      cacheCapacity += node.getCacheCapacity();
+      cacheUsed += node.getCacheUsed();
     }
 
     private void subtract(final DatanodeDescriptor node) {
@@ -334,6 +349,8 @@ class HeartbeatManager implements Datano
       } else {
         capacityTotal -= node.getDfsUsed();
       }
+      cacheCapacity -= node.getCacheCapacity();
+      cacheUsed -= node.getCacheUsed();
     }
     
     /** Increment expired heartbeat counter. */

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Dec  2 17:41:44 2013
@@ -52,6 +52,7 @@ import java.util.concurrent.atomic.Atomi
 
 import javax.management.ObjectName;
 
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -234,6 +235,7 @@ public class DataNode extends Configured
   private volatile boolean heartbeatsDisabledForTests = false;
   private DataStorage storage = null;
   private HttpServer infoServer = null;
+  private int infoPort;
   private int infoSecurePort;
   DataNodeMetrics metrics;
   private InetSocketAddress streamingAddr;
@@ -354,27 +356,33 @@ public class DataNode extends Configured
     String infoHost = infoSocAddr.getHostName();
     int tmpInfoPort = infoSocAddr.getPort();
     HttpServer.Builder builder = new HttpServer.Builder().setName("datanode")
-        .setBindAddress(infoHost).setPort(tmpInfoPort)
+        .addEndpoint(URI.create("http://" + NetUtils.getHostPortString(infoSocAddr)))
         .setFindPort(tmpInfoPort == 0).setConf(conf)
         .setACL(new AccessControlList(conf.get(DFS_ADMIN, " ")));
-    this.infoServer = (secureResources == null) ? builder.build() :
-        builder.setConnector(secureResources.getListener()).build();
 
     LOG.info("Opened info server at " + infoHost + ":" + tmpInfoPort);
     if (conf.getBoolean(DFS_HTTPS_ENABLE_KEY, false)) {
-      boolean needClientAuth = conf.getBoolean(DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
-                                               DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT);
       InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(conf.get(
           DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 0));
-      Configuration sslConf = new HdfsConfiguration(false);
-      sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
-          "ssl-server.xml"));
-      this.infoServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
+      builder.addEndpoint(URI.create("https://"
+          + NetUtils.getHostPortString(secInfoSocAddr)));
+      Configuration sslConf = new Configuration(false);
+      sslConf.setBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY, conf
+          .getBoolean(DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_KEY,
+              DFSConfigKeys.DFS_CLIENT_HTTPS_NEED_AUTH_DEFAULT));
+      sslConf.addResource(conf.get(
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+      DFSUtil.loadSslConfToHttpServerBuilder(builder, sslConf);
+
       if(LOG.isDebugEnabled()) {
         LOG.debug("Datanode listening for SSL on " + secInfoSocAddr);
       }
       infoSecurePort = secInfoSocAddr.getPort();
     }
+
+    this.infoServer = (secureResources == null) ? builder.build() :
+      builder.setConnector(secureResources.getListener()).build();
     this.infoServer.addInternalServlet(null, "/streamFile/*", StreamFile.class);
     this.infoServer.addInternalServlet(null, "/getFileChecksum/*",
         FileChecksumServlets.GetServlet.class);
@@ -390,6 +398,7 @@ public class DataNode extends Configured
           WebHdfsFileSystem.PATH_PREFIX + "/*");
     }
     this.infoServer.start();
+    this.infoPort = infoServer.getConnectorAddress(0).getPort();
   }
   
   private void startPlugins(Configuration conf) {
@@ -712,7 +721,7 @@ public class DataNode extends Configured
     this.dnConf = new DNConf(conf);
 
     if (dnConf.maxLockedMemory > 0) {
-      if (!NativeIO.isAvailable()) {
+      if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {
         throw new RuntimeException(String.format(
             "Cannot start datanode because the configured max locked memory" +
             " size (%s) is greater than zero and native code is not available.",
@@ -2320,7 +2329,7 @@ public class DataNode extends Configured
    * @return the datanode's http port
    */
   public int getInfoPort() {
-    return infoServer.getPort();
+    return infoPort;
   }
 
   /**

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataStorage.java Mon Dec  2 17:41:44 2013
@@ -310,7 +310,16 @@ public class DataStorage extends Storage
   @Override
   protected void setFieldsFromProperties(Properties props, StorageDirectory sd)
       throws IOException {
-    setLayoutVersion(props, sd);
+    setFieldsFromProperties(props, sd, false, 0);
+  }
+
+  private void setFieldsFromProperties(Properties props, StorageDirectory sd,
+      boolean overrideLayoutVersion, int toLayoutVersion) throws IOException {
+    if (overrideLayoutVersion) {
+      this.layoutVersion = toLayoutVersion;
+    } else {
+      setLayoutVersion(props, sd);
+    }
     setcTime(props, sd);
     setStorageType(props, sd);
     setClusterId(props, layoutVersion, sd);
@@ -374,13 +383,20 @@ public class DataStorage extends Storage
     return true;
   }
   
+  /** Read VERSION file for rollback */
+  void readProperties(StorageDirectory sd, int rollbackLayoutVersion)
+      throws IOException {
+    Properties props = readPropertiesFile(sd.getVersionFile());
+    setFieldsFromProperties(props, sd, true, rollbackLayoutVersion);
+  }
+
   /**
    * Analize which and whether a transition of the fs state is required
    * and perform it if necessary.
    * 
-   * Rollback if previousLV >= LAYOUT_VERSION && prevCTime <= namenode.cTime
-   * Upgrade if this.LV > LAYOUT_VERSION || this.cTime < namenode.cTime
-   * Regular startup if this.LV = LAYOUT_VERSION && this.cTime = namenode.cTime
+   * Rollback if the rollback startup option was specified.
+   * Upgrade if this.LV > LAYOUT_VERSION
+   * Regular startup if this.LV = LAYOUT_VERSION
    * 
    * @param datanode Datanode to which this storage belongs to
    * @param sd  storage directory
@@ -420,25 +436,28 @@ public class DataStorage extends Storage
           + nsInfo.getClusterID() + "; datanode clusterID = " + getClusterID());
     }
     
-    // regular start up
-    if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION 
-        && this.cTime == nsInfo.getCTime())
+    // After addition of the federation feature, ctime check is only 
+    // meaningful at BlockPoolSliceStorage level. 
+
+    // regular start up. 
+    if (this.layoutVersion == HdfsConstants.LAYOUT_VERSION)
       return; // regular startup
     
     // do upgrade
-    if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION
-        || this.cTime < nsInfo.getCTime()) {
+    if (this.layoutVersion > HdfsConstants.LAYOUT_VERSION) {
       doUpgrade(sd, nsInfo);  // upgrade
       return;
     }
     
-    // layoutVersion == LAYOUT_VERSION && this.cTime > nsInfo.cTime
-    // must shutdown
-    throw new IOException("Datanode state: LV = " + this.getLayoutVersion() 
-                          + " CTime = " + this.getCTime() 
-                          + " is newer than the namespace state: LV = "
-                          + nsInfo.getLayoutVersion() 
-                          + " CTime = " + nsInfo.getCTime());
+    // layoutVersion < LAYOUT_VERSION. I.e. stored layout version is newer
+    // than the version supported by datanode. This should have been caught
+    // in readProperties(), even if rollback was not carried out or somehow
+    // failed.
+    throw new IOException("BUG: The stored LV = " + this.getLayoutVersion()
+                          + " is newer than the supported LV = "
+                          + HdfsConstants.LAYOUT_VERSION
+                          + " or name node LV = "
+                          + nsInfo.getLayoutVersion());
   }
 
   /**
@@ -464,8 +483,13 @@ public class DataStorage extends Storage
    * @throws IOException on error
    */
   void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
+    // If the existing on-disk layout version supportes federation, simply
+    // update its layout version.
     if (LayoutVersion.supports(Feature.FEDERATION, layoutVersion)) {
-      clusterID = nsInfo.getClusterID();
+      // The VERSION file is already read in. Override the layoutVersion 
+      // field and overwrite the file.
+      LOG.info("Updating layout version from " + layoutVersion + " to "
+          + nsInfo.getLayoutVersion() + " for storage " + sd.getRoot());
       layoutVersion = nsInfo.getLayoutVersion();
       writeProperties(sd);
       return;
@@ -550,15 +574,32 @@ public class DataStorage extends Storage
    * <li> Remove removed.tmp </li>
    * </ol>
    * 
-   * Do nothing, if previous directory does not exist.
+   * If previous directory does not exist and the current version supports
+   * federation, perform a simple rollback of layout version. This does not
+   * involve saving/restoration of actual data.
    */
   void doRollback( StorageDirectory sd,
                    NamespaceInfo nsInfo
                    ) throws IOException {
     File prevDir = sd.getPreviousDir();
-    // regular startup if previous dir does not exist
-    if (!prevDir.exists())
+    // This is a regular startup or a post-federation rollback
+    if (!prevDir.exists()) {
+      // The current datanode version supports federation and the layout
+      // version from namenode matches what the datanode supports. An invalid
+      // rollback may happen if namenode didn't rollback and datanode is
+      // running a wrong version.  But this will be detected in block pool
+      // level and the invalid VERSION content will be overwritten when
+      // the error is corrected and rollback is retried.
+      if (LayoutVersion.supports(Feature.FEDERATION,
+          HdfsConstants.LAYOUT_VERSION) && 
+          HdfsConstants.LAYOUT_VERSION == nsInfo.getLayoutVersion()) {
+        readProperties(sd, nsInfo.getLayoutVersion());
+        writeProperties(sd);
+        LOG.info("Layout version rolled back to " +
+            nsInfo.getLayoutVersion() + " for storage " + sd.getRoot());
+      }
       return;
+    }
     DataStorage prevInfo = new DataStorage();
     prevInfo.readPreviousVersionProperties(sd);
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetCache.java Mon Dec  2 17:41:44 2013
@@ -145,6 +145,8 @@ public class FsDatasetCache {
    */
   private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>();
 
+  private final AtomicLong numBlocksCached = new AtomicLong(0);
+
   private final FsDatasetImpl dataset;
 
   private final ThreadPoolExecutor uncachingExecutor;
@@ -417,6 +419,7 @@ public class FsDatasetCache {
           LOG.debug("Successfully cached block " + key.id + " in " + key.bpid +
               ".  We are now caching " + newUsedBytes + " bytes in total.");
         }
+        numBlocksCached.addAndGet(1);
         success = true;
       } finally {
         if (!success) {
@@ -465,6 +468,7 @@ public class FsDatasetCache {
       }
       long newUsedBytes =
           usedBytesCount.release(value.mappableBlock.getLength());
+      numBlocksCached.addAndGet(-1);
       if (LOG.isDebugEnabled()) {
         LOG.debug("Uncaching of block " + key.id + " in " + key.bpid +
             " completed.  usedBytes = " + newUsedBytes);
@@ -477,14 +481,14 @@ public class FsDatasetCache {
   /**
    * Get the approximate amount of cache space used.
    */
-  public long getDnCacheUsed() {
+  public long getCacheUsed() {
     return usedBytesCount.get();
   }
 
   /**
    * Get the maximum amount of bytes we can cache.  This is a constant.
    */
-  public long getDnCacheCapacity() {
+  public long getCacheCapacity() {
     return maxBytes;
   }
 
@@ -496,4 +500,7 @@ public class FsDatasetCache {
     return numBlocksFailedToUncache.get();
   }
 
+  public long getNumBlocksCached() {
+    return numBlocksCached.get();
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Mon Dec  2 17:41:44 2013
@@ -341,12 +341,12 @@ class FsDatasetImpl implements FsDataset
 
   @Override // FSDatasetMBean
   public long getCacheUsed() {
-    return cacheManager.getDnCacheUsed();
+    return cacheManager.getCacheUsed();
   }
 
   @Override // FSDatasetMBean
   public long getCacheCapacity() {
-    return cacheManager.getDnCacheCapacity();
+    return cacheManager.getCacheCapacity();
   }
 
   @Override // FSDatasetMBean
@@ -359,6 +359,11 @@ class FsDatasetImpl implements FsDataset
     return cacheManager.getNumBlocksFailedToUncache();
   }
 
+  @Override // FSDatasetMBean
+  public long getNumBlocksCached() {
+    return cacheManager.getNumBlocksCached();
+  }
+
   /**
    * Find the block's on-disk length
    */

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/FSDatasetMBean.java Mon Dec  2 17:41:44 2013
@@ -89,6 +89,11 @@ public interface FSDatasetMBean {
   public long getCacheCapacity();
 
   /**
+   * Returns the number of blocks cached.
+   */
+  public long getNumBlocksCached();
+
+  /**
    * Returns the number of blocks that the datanode was unable to cache
    */
   public long getNumBlocksFailedToCache();

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Mon Dec  2 17:41:44 2013
@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
 
@@ -43,17 +43,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
@@ -99,24 +100,24 @@ public final class CacheManager {
   private final BlockManager blockManager;
 
   /**
-   * Cache entries, sorted by ID.
+   * Cache directives, sorted by ID.
    *
    * listCacheDirectives relies on the ordering of elements in this map
    * to track what has already been listed by the client.
    */
-  private final TreeMap<Long, CacheDirective> entriesById =
+  private final TreeMap<Long, CacheDirective> directivesById =
       new TreeMap<Long, CacheDirective>();
 
   /**
-   * The entry ID to use for a new entry.  Entry IDs always increase, and are
+   * The directive ID to use for a new directive.  IDs always increase, and are
    * never reused.
    */
-  private long nextEntryId;
+  private long nextDirectiveId;
 
   /**
-   * Cache entries, sorted by path
+   * Cache directives, sorted by path
    */
-  private final TreeMap<String, List<CacheDirective>> entriesByPath =
+  private final TreeMap<String, List<CacheDirective>> directivesByPath =
       new TreeMap<String, List<CacheDirective>>();
 
   /**
@@ -177,7 +178,7 @@ public final class CacheManager {
       BlockManager blockManager) {
     this.namesystem = namesystem;
     this.blockManager = blockManager;
-    this.nextEntryId = 1;
+    this.nextDirectiveId = 1;
     this.maxListCachePoolsResponses = conf.getInt(
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES,
         DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT);
@@ -239,7 +240,7 @@ public final class CacheManager {
 
   public TreeMap<Long, CacheDirective> getEntriesById() {
     assert namesystem.hasReadLock();
-    return entriesById;
+    return directivesById;
   }
   
   @VisibleForTesting
@@ -248,12 +249,12 @@ public final class CacheManager {
     return cachedBlocks;
   }
 
-  private long getNextEntryId() throws IOException {
+  private long getNextDirectiveId() throws IOException {
     assert namesystem.hasWriteLock();
-    if (nextEntryId >= Long.MAX_VALUE - 1) {
+    if (nextDirectiveId >= Long.MAX_VALUE - 1) {
       throw new IOException("No more available IDs.");
     }
-    return nextEntryId++;
+    return nextDirectiveId++;
   }
 
   // Helper getter / validation methods
@@ -301,7 +302,35 @@ public final class CacheManager {
   }
 
   /**
-   * Get a CacheDirective by ID, validating the ID and that the entry
+   * Calculates the absolute expiry time of the directive from the
+   * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
+   * into an absolute time based on the local clock.
+   * 
+   * @param directive from which to get the expiry time
+   * @param defaultValue to use if Expiration is not set
+   * @return Absolute expiry time in milliseconds since Unix epoch
+   * @throws InvalidRequestException if the Expiration is invalid
+   */
+  private static long validateExpiryTime(CacheDirectiveInfo directive,
+      long defaultValue) throws InvalidRequestException {
+    long expiryTime;
+    CacheDirectiveInfo.Expiration expiration = directive.getExpiration();
+    if (expiration != null) {
+      if (expiration.getMillis() < 0) {
+        throw new InvalidRequestException("Cannot set a negative expiration: "
+            + expiration.getMillis());
+      }
+      // Converts a relative duration into an absolute time based on the local
+      // clock
+      expiryTime = expiration.getAbsoluteMillis();
+    } else {
+      expiryTime = defaultValue;
+    }
+    return expiryTime;
+  }
+
+  /**
+   * Get a CacheDirective by ID, validating the ID and that the directive
    * exists.
    */
   private CacheDirective getById(long id) throws InvalidRequestException {
@@ -309,13 +338,13 @@ public final class CacheManager {
     if (id <= 0) {
       throw new InvalidRequestException("Invalid negative ID.");
     }
-    // Find the entry.
-    CacheDirective entry = entriesById.get(id);
-    if (entry == null) {
+    // Find the directive.
+    CacheDirective directive = directivesById.get(id);
+    if (directive == null) {
       throw new InvalidRequestException("No directive with ID " + id
           + " found.");
     }
-    return entry;
+    return directive;
   }
 
   /**
@@ -332,122 +361,134 @@ public final class CacheManager {
 
   // RPC handlers
 
-  private void addInternal(CacheDirective entry) {
-    entriesById.put(entry.getEntryId(), entry);
-    String path = entry.getPath();
-    List<CacheDirective> entryList = entriesByPath.get(path);
-    if (entryList == null) {
-      entryList = new ArrayList<CacheDirective>(1);
-      entriesByPath.put(path, entryList);
+  private void addInternal(CacheDirective directive, CachePool pool) {
+    boolean addedDirective = pool.getDirectiveList().add(directive);
+    assert addedDirective;
+    directivesById.put(directive.getId(), directive);
+    String path = directive.getPath();
+    List<CacheDirective> directives = directivesByPath.get(path);
+    if (directives == null) {
+      directives = new ArrayList<CacheDirective>(1);
+      directivesByPath.put(path, directives);
     }
-    entryList.add(entry);
+    directives.add(directive);
+  }
+
+  /**
+   * To be called only from the edit log loading code
+   */
+  CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
+      throws InvalidRequestException {
+    long id = directive.getId();
+    CacheDirective entry =
+        new CacheDirective(
+            directive.getId(),
+            directive.getPath().toUri().getPath(),
+            directive.getReplication(),
+            directive.getExpiration().getAbsoluteMillis());
+    CachePool pool = cachePools.get(directive.getPool());
+    addInternal(entry, pool);
+    if (nextDirectiveId <= id) {
+      nextDirectiveId = id + 1;
+    }
+    return entry.toInfo();
   }
 
   public CacheDirectiveInfo addDirective(
-      CacheDirectiveInfo directive, FSPermissionChecker pc)
+      CacheDirectiveInfo info, FSPermissionChecker pc)
       throws IOException {
     assert namesystem.hasWriteLock();
-    CacheDirective entry;
+    CacheDirective directive;
     try {
-      CachePool pool = getCachePool(validatePoolName(directive));
+      CachePool pool = getCachePool(validatePoolName(info));
       checkWritePermission(pc, pool);
-      String path = validatePath(directive);
-      short replication = validateReplication(directive, (short)1);
-      long id;
-      if (directive.getId() != null) {
-        // We are loading an entry from the edit log.
-        // Use the ID from the edit log.
-        id = directive.getId();
-        if (id <= 0) {
-          throw new InvalidRequestException("can't add an ID " +
-              "of " + id + ": it is not positive.");
-        }
-        if (id >= Long.MAX_VALUE) {
-          throw new InvalidRequestException("can't add an ID " +
-              "of " + id + ": it is too big.");
-        }
-        if (nextEntryId <= id) {
-          nextEntryId = id + 1;
-        }
-      } else {
-        // Add a new entry with the next available ID.
-        id = getNextEntryId();
-      }
-      entry = new CacheDirective(id, path, replication, pool);
-      addInternal(entry);
+      String path = validatePath(info);
+      short replication = validateReplication(info, (short)1);
+      long expiryTime = validateExpiryTime(info,
+          CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
+      // All validation passed
+      // Add a new entry with the next available ID.
+      long id = getNextDirectiveId();
+      directive = new CacheDirective(id, path, replication, expiryTime);
+      addInternal(directive, pool);
     } catch (IOException e) {
-      LOG.warn("addDirective of " + directive + " failed: ", e);
+      LOG.warn("addDirective of " + info + " failed: ", e);
       throw e;
     }
-    LOG.info("addDirective of " + directive + " successful.");
+    LOG.info("addDirective of " + info + " successful.");
     if (monitor != null) {
       monitor.kick();
     }
-    return entry.toDirective();
+    return directive.toInfo();
   }
 
-  public void modifyDirective(CacheDirectiveInfo directive,
+  public void modifyDirective(CacheDirectiveInfo info,
       FSPermissionChecker pc) throws IOException {
     assert namesystem.hasWriteLock();
     String idString =
-        (directive.getId() == null) ?
-            "(null)" : directive.getId().toString();
+        (info.getId() == null) ?
+            "(null)" : info.getId().toString();
     try {
       // Check for invalid IDs.
-      Long id = directive.getId();
+      Long id = info.getId();
       if (id == null) {
         throw new InvalidRequestException("Must supply an ID.");
       }
       CacheDirective prevEntry = getById(id);
       checkWritePermission(pc, prevEntry.getPool());
       String path = prevEntry.getPath();
-      if (directive.getPath() != null) {
-        path = validatePath(directive);
+      if (info.getPath() != null) {
+        path = validatePath(info);
       }
+
       short replication = prevEntry.getReplication();
-      if (directive.getReplication() != null) {
-        replication = validateReplication(directive, replication);
-      }
+      replication = validateReplication(info, replication);
+
+      long expiryTime = prevEntry.getExpiryTime();
+      expiryTime = validateExpiryTime(info, expiryTime);
+
       CachePool pool = prevEntry.getPool();
-      if (directive.getPool() != null) {
-        pool = getCachePool(validatePoolName(directive));
+      if (info.getPool() != null) {
+        pool = getCachePool(validatePoolName(info));
         checkWritePermission(pc, pool);
       }
       removeInternal(prevEntry);
       CacheDirective newEntry =
-          new CacheDirective(id, path, replication, pool);
-      addInternal(newEntry);
+          new CacheDirective(id, path, replication, expiryTime);
+      addInternal(newEntry, pool);
     } catch (IOException e) {
       LOG.warn("modifyDirective of " + idString + " failed: ", e);
       throw e;
     }
     LOG.info("modifyDirective of " + idString + " successfully applied " +
-        directive + ".");
+        info+ ".");
   }
 
-  public void removeInternal(CacheDirective existing)
+  public void removeInternal(CacheDirective directive)
       throws InvalidRequestException {
     assert namesystem.hasWriteLock();
-    // Remove the corresponding entry in entriesByPath.
-    String path = existing.getPath();
-    List<CacheDirective> entries = entriesByPath.get(path);
-    if (entries == null || !entries.remove(existing)) {
+    // Remove the corresponding entry in directivesByPath.
+    String path = directive.getPath();
+    List<CacheDirective> directives = directivesByPath.get(path);
+    if (directives == null || !directives.remove(directive)) {
       throw new InvalidRequestException("Failed to locate entry " +
-          existing.getEntryId() + " by path " + existing.getPath());
+          directive.getId() + " by path " + directive.getPath());
     }
-    if (entries.size() == 0) {
-      entriesByPath.remove(path);
+    if (directives.size() == 0) {
+      directivesByPath.remove(path);
     }
-    entriesById.remove(existing.getEntryId());
+    directivesById.remove(directive.getId());
+    directive.getPool().getDirectiveList().remove(directive);
+    assert directive.getPool() == null;
   }
 
   public void removeDirective(long id, FSPermissionChecker pc)
       throws IOException {
     assert namesystem.hasWriteLock();
     try {
-      CacheDirective existing = getById(id);
-      checkWritePermission(pc, existing.getPool());
-      removeInternal(existing);
+      CacheDirective directive = getById(id);
+      checkWritePermission(pc, directive.getPool());
+      removeInternal(directive);
     } catch (IOException e) {
       LOG.warn("removeDirective of " + id + " failed: ", e);
       throw e;
@@ -478,13 +519,13 @@ public final class CacheManager {
         new ArrayList<CacheDirectiveEntry>(NUM_PRE_ALLOCATED_ENTRIES);
     int numReplies = 0;
     SortedMap<Long, CacheDirective> tailMap =
-      entriesById.tailMap(prevId + 1);
+      directivesById.tailMap(prevId + 1);
     for (Entry<Long, CacheDirective> cur : tailMap.entrySet()) {
       if (numReplies >= maxListCacheDirectivesNumResponses) {
         return new BatchedListEntries<CacheDirectiveEntry>(replies, true);
       }
-      CacheDirective curEntry = cur.getValue();
-      CacheDirectiveInfo info = cur.getValue().toDirective();
+      CacheDirective curDirective = cur.getValue();
+      CacheDirectiveInfo info = cur.getValue().toInfo();
       if (filter.getPool() != null && 
           !info.getPool().equals(filter.getPool())) {
         continue;
@@ -496,7 +537,7 @@ public final class CacheManager {
       boolean hasPermission = true;
       if (pc != null) {
         try {
-          pc.checkPermission(curEntry.getPool(), FsAction.READ);
+          pc.checkPermission(curDirective.getPool(), FsAction.READ);
         } catch (AccessControlException e) {
           hasPermission = false;
         }
@@ -530,7 +571,7 @@ public final class CacheManager {
     pool = CachePool.createFromInfoAndDefaults(info);
     cachePools.put(pool.getPoolName(), pool);
     LOG.info("Created new cache pool " + pool);
-    return pool.getInfo(null);
+    return pool.getInfo(true);
   }
 
   /**
@@ -599,39 +640,34 @@ public final class CacheManager {
       throw new InvalidRequestException(
           "Cannot remove non-existent cache pool " + poolName);
     }
-    
-    // Remove entries using this pool
-    // TODO: could optimize this somewhat to avoid the need to iterate
-    // over all entries in entriesById
-    Iterator<Entry<Long, CacheDirective>> iter = 
-        entriesById.entrySet().iterator();
+    // Remove all directives in this pool.
+    Iterator<CacheDirective> iter = pool.getDirectiveList().iterator();
     while (iter.hasNext()) {
-      Entry<Long, CacheDirective> entry = iter.next();
-      if (entry.getValue().getPool() == pool) {
-        entriesByPath.remove(entry.getValue().getPath());
-        iter.remove();
-      }
+      CacheDirective directive = iter.next();
+      directivesByPath.remove(directive.getPath());
+      directivesById.remove(directive.getId());
+      iter.remove();
     }
     if (monitor != null) {
       monitor.kick();
     }
   }
 
-  public BatchedListEntries<CachePoolInfo>
+  public BatchedListEntries<CachePoolEntry>
       listCachePools(FSPermissionChecker pc, String prevKey) {
     assert namesystem.hasReadLock();
     final int NUM_PRE_ALLOCATED_ENTRIES = 16;
-    ArrayList<CachePoolInfo> results = 
-        new ArrayList<CachePoolInfo>(NUM_PRE_ALLOCATED_ENTRIES);
+    ArrayList<CachePoolEntry> results = 
+        new ArrayList<CachePoolEntry>(NUM_PRE_ALLOCATED_ENTRIES);
     SortedMap<String, CachePool> tailMap = cachePools.tailMap(prevKey, false);
     int numListed = 0;
     for (Entry<String, CachePool> cur : tailMap.entrySet()) {
       if (numListed++ >= maxListCachePoolsResponses) {
-        return new BatchedListEntries<CachePoolInfo>(results, true);
+        return new BatchedListEntries<CachePoolEntry>(results, true);
       }
-      results.add(cur.getValue().getInfo(pc));
+      results.add(cur.getValue().getEntry(pc));
     }
-    return new BatchedListEntries<CachePoolInfo>(results, false);
+    return new BatchedListEntries<CachePoolEntry>(results, false);
   }
 
   public void setCachedLocations(LocatedBlock block) {
@@ -693,13 +729,6 @@ public final class CacheManager {
     for (Iterator<Long> iter = blockIds.iterator(); iter.hasNext(); ) {
       Block block = new Block(iter.next());
       BlockInfo blockInfo = blockManager.getStoredBlock(block);
-      if (blockInfo.getGenerationStamp() < block.getGenerationStamp()) {
-        // The NameNode will eventually remove or update the out-of-date block.
-        // Until then, we pretend that it isn't cached.
-        LOG.warn("Genstamp in cache report disagrees with our genstamp for " +
-          block + ": expected genstamp " + blockInfo.getGenerationStamp());
-        continue;
-      }
       if (!blockInfo.isComplete()) {
         LOG.warn("Ignoring block id " + block.getBlockId() + ", because " +
             "it is in not complete yet.  It is in state " + 
@@ -743,9 +772,9 @@ public final class CacheManager {
    */
   public void saveState(DataOutput out, String sdPath)
       throws IOException {
-    out.writeLong(nextEntryId);
+    out.writeLong(nextDirectiveId);
     savePools(out, sdPath);
-    saveEntries(out, sdPath);
+    saveDirectives(out, sdPath);
   }
 
   /**
@@ -755,10 +784,10 @@ public final class CacheManager {
    * @throws IOException
    */
   public void loadState(DataInput in) throws IOException {
-    nextEntryId = in.readLong();
-    // pools need to be loaded first since entries point to their parent pool
+    nextDirectiveId = in.readLong();
+    // pools need to be loaded first since directives point to their parent pool
     loadPools(in);
-    loadEntries(in);
+    loadDirectives(in);
   }
 
   /**
@@ -773,7 +802,7 @@ public final class CacheManager {
     Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
     out.writeInt(cachePools.size());
     for (CachePool pool: cachePools.values()) {
-      pool.getInfo(null).writeTo(out);
+      pool.getInfo(true).writeTo(out);
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -782,19 +811,20 @@ public final class CacheManager {
   /*
    * Save cache entries to fsimage
    */
-  private void saveEntries(DataOutput out, String sdPath)
+  private void saveDirectives(DataOutput out, String sdPath)
       throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_ENTRIES, sdPath);
     prog.beginStep(Phase.SAVING_CHECKPOINT, step);
-    prog.setTotal(Phase.SAVING_CHECKPOINT, step, entriesById.size());
+    prog.setTotal(Phase.SAVING_CHECKPOINT, step, directivesById.size());
     Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
-    out.writeInt(entriesById.size());
-    for (CacheDirective entry: entriesById.values()) {
-      out.writeLong(entry.getEntryId());
-      Text.writeString(out, entry.getPath());
-      out.writeShort(entry.getReplication());
-      Text.writeString(out, entry.getPool().getPoolName());
+    out.writeInt(directivesById.size());
+    for (CacheDirective directive : directivesById.values()) {
+      out.writeLong(directive.getId());
+      Text.writeString(out, directive.getPath());
+      out.writeShort(directive.getReplication());
+      Text.writeString(out, directive.getPool().getPoolName());
+      out.writeLong(directive.getExpiryTime());
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -819,38 +849,42 @@ public final class CacheManager {
   }
 
   /**
-   * Load cache entries from the fsimage
+   * Load cache directives from the fsimage
    */
-  private void loadEntries(DataInput in) throws IOException {
+  private void loadDirectives(DataInput in) throws IOException {
     StartupProgress prog = NameNode.getStartupProgress();
     Step step = new Step(StepType.CACHE_ENTRIES);
     prog.beginStep(Phase.LOADING_FSIMAGE, step);
-    int numberOfEntries = in.readInt();
-    prog.setTotal(Phase.LOADING_FSIMAGE, step, numberOfEntries);
+    int numDirectives = in.readInt();
+    prog.setTotal(Phase.LOADING_FSIMAGE, step, numDirectives);
     Counter counter = prog.getCounter(Phase.LOADING_FSIMAGE, step);
-    for (int i = 0; i < numberOfEntries; i++) {
-      long entryId = in.readLong();
+    for (int i = 0; i < numDirectives; i++) {
+      long directiveId = in.readLong();
       String path = Text.readString(in);
       short replication = in.readShort();
       String poolName = Text.readString(in);
+      long expiryTime = in.readLong();
       // Get pool reference by looking it up in the map
       CachePool pool = cachePools.get(poolName);
       if (pool == null) {
-        throw new IOException("Entry refers to pool " + poolName +
+        throw new IOException("Directive refers to pool " + poolName +
             ", which does not exist.");
       }
-      CacheDirective entry =
-          new CacheDirective(entryId, path, replication, pool);
-      if (entriesById.put(entry.getEntryId(), entry) != null) {
-        throw new IOException("An entry with ID " + entry.getEntryId() +
+      CacheDirective directive =
+          new CacheDirective(directiveId, path, replication, expiryTime);
+      boolean addedDirective = pool.getDirectiveList().add(directive);
+      assert addedDirective;
+      if (directivesById.put(directive.getId(), directive) != null) {
+        throw new IOException("A directive with ID " + directive.getId() +
             " already exists");
       }
-      List<CacheDirective> entries = entriesByPath.get(entry.getPath());
-      if (entries == null) {
-        entries = new LinkedList<CacheDirective>();
-        entriesByPath.put(entry.getPath(), entries);
+      List<CacheDirective> directives =
+          directivesByPath.get(directive.getPath());
+      if (directives == null) {
+        directives = new LinkedList<CacheDirective>();
+        directivesByPath.put(directive.getPath(), directives);
       }
-      entries.add(entry);
+      directives.add(directive);
       counter.increment();
     }
     prog.endStep(Phase.LOADING_FSIMAGE, step);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CachePool.java Mon Dec  2 17:41:44 2013
@@ -26,9 +26,13 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolStats;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.IntrusiveCollection;
 
 import com.google.common.base.Preconditions;
 
@@ -69,6 +73,22 @@ public final class CachePool {
   
   private int weight;
 
+  public final static class DirectiveList
+      extends IntrusiveCollection<CacheDirective> {
+    private CachePool cachePool;
+
+    private DirectiveList(CachePool cachePool) {
+      this.cachePool = cachePool;
+    }
+
+    public CachePool getCachePool() {
+      return cachePool;
+    }
+  }
+
+  @Nonnull
+  private final DirectiveList directiveList = new DirectiveList(this);
+
   /**
    * Create a new cache pool based on a CachePoolInfo object and the defaults.
    * We will fill in information that was not supplied according to the
@@ -171,7 +191,7 @@ public final class CachePool {
    * @return
    *          Cache pool information.
    */
-  private CachePoolInfo getInfo(boolean fullInfo) {
+  CachePoolInfo getInfo(boolean fullInfo) {
     CachePoolInfo info = new CachePoolInfo(poolName);
     if (!fullInfo) {
       return info;
@@ -183,15 +203,28 @@ public final class CachePool {
   }
 
   /**
+   * Get statistics about this CachePool.
+   *
+   * @return   Cache pool statistics.
+   */
+  private CachePoolStats getStats() {
+    return new CachePoolStats.Builder().
+        setBytesNeeded(0).
+        setBytesCached(0).
+        setFilesAffected(0).
+        build();
+  }
+
+  /**
    * Returns a CachePoolInfo describing this CachePool based on the permissions
    * of the calling user. Unprivileged users will see only minimal descriptive
    * information about the pool.
    * 
    * @param pc Permission checker to be used to validate the user's permissions,
    *          or null
-   * @return CachePoolInfo describing this CachePool
+   * @return CachePoolEntry describing this CachePool
    */
-  public CachePoolInfo getInfo(FSPermissionChecker pc) {
+  public CachePoolEntry getEntry(FSPermissionChecker pc) {
     boolean hasPermission = true;
     if (pc != null) {
       try {
@@ -200,7 +233,8 @@ public final class CachePool {
         hasPermission = false;
       }
     }
-    return getInfo(hasPermission);
+    return new CachePoolEntry(getInfo(hasPermission), 
+        hasPermission ? getStats() : new CachePoolStats.Builder().build());
   }
 
   public String toString() {
@@ -212,4 +246,8 @@ public final class CachePool {
         append(", weight:").append(weight).
         append(" }").toString();
   }
+
+  public DirectiveList getDirectiveList() {
+    return directiveList;
+  }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogFileInputStream.java Mon Dec  2 17:41:44 2013
@@ -36,8 +36,11 @@ import org.apache.hadoop.hdfs.DFSConfigK
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.server.common.Storage;
 import org.apache.hadoop.hdfs.server.namenode.TransferFsImage.HttpGetFailedException;
+import org.apache.hadoop.hdfs.web.URLConnectionFactory;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
@@ -100,15 +103,22 @@ public class EditLogFileInputStream exte
   /**
    * Open an EditLogInputStream for the given URL.
    *
-   * @param url the url hosting the log
-   * @param startTxId the expected starting txid
-   * @param endTxId the expected ending txid
-   * @param inProgress whether the log is in-progress
+   * @param connectionFactory
+   *          the URLConnectionFactory used to create the connection.
+   * @param url
+   *          the url hosting the log
+   * @param startTxId
+   *          the expected starting txid
+   * @param endTxId
+   *          the expected ending txid
+   * @param inProgress
+   *          whether the log is in-progress
    * @return a stream from which edits may be read
    */
-  public static EditLogInputStream fromUrl(URL url, long startTxId,
-      long endTxId, boolean inProgress) {
-    return new EditLogFileInputStream(new URLLog(url),
+  public static EditLogInputStream fromUrl(
+      URLConnectionFactory connectionFactory, URL url, long startTxId,
+ long endTxId, boolean inProgress) {
+    return new EditLogFileInputStream(new URLLog(connectionFactory, url),
         startTxId, endTxId, inProgress);
   }
   
@@ -365,8 +375,12 @@ public class EditLogFileInputStream exte
     private long advertisedSize = -1;
 
     private final static String CONTENT_LENGTH = "Content-Length";
+    private final URLConnectionFactory connectionFactory;
+    private final boolean isSpnegoEnabled;
 
-    public URLLog(URL url) {
+    public URLLog(URLConnectionFactory connectionFactory, URL url) {
+      this.connectionFactory = connectionFactory;
+      this.isSpnegoEnabled = UserGroupInformation.isSecurityEnabled();
       this.url = url;
     }
 
@@ -376,8 +390,13 @@ public class EditLogFileInputStream exte
           new PrivilegedExceptionAction<InputStream>() {
             @Override
             public InputStream run() throws IOException {
-              HttpURLConnection connection = (HttpURLConnection)
-                  SecurityUtil.openSecureHttpConnection(url);
+              HttpURLConnection connection;
+              try {
+                connection = (HttpURLConnection)
+                    connectionFactory.openConnection(url, isSpnegoEnabled);
+              } catch (AuthenticationException e) {
+                throw new IOException(e);
+              }
               
               if (connection.getResponseCode() != HttpURLConnection.HTTP_OK) {
                 throw new HttpGetFailedException(

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogOutputStream.java Mon Dec  2 17:41:44 2013
@@ -24,7 +24,6 @@ import static org.apache.hadoop.util.Tim
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.jasper.compiler.JspUtil;
 
 /**
  * A generic abstract class to support journaling of edits logs into 
@@ -141,10 +140,10 @@ public abstract class EditLogOutputStrea
   }
 
   /**
-   * @return a short HTML snippet suitable for describing the current
+   * @return a short text snippet suitable for describing the current
    * status of the stream
    */
-  public String generateHtmlReport() {
-    return JspUtil.escapeXml(this.toString());
+  public String generateReport() {
+    return toString();
   }
 }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java Mon Dec  2 17:41:44 2013
@@ -87,11 +87,15 @@ import com.google.common.base.Preconditi
  * 
  *************************************************/
 public class FSDirectory implements Closeable {
-  private static INodeDirectoryWithQuota createRoot(FSNamesystem namesystem) {
-    final INodeDirectoryWithQuota r = new INodeDirectoryWithQuota(
+  private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
+    final INodeDirectory r = new INodeDirectory(
         INodeId.ROOT_INODE_ID,
         INodeDirectory.ROOT_NAME,
-        namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)));
+        namesystem.createFsOwnerPermissions(new FsPermission((short) 0755)),
+        0L);
+    r.addDirectoryWithQuotaFeature(
+        DirectoryWithQuotaFeature.DEFAULT_NAMESPACE_QUOTA,
+        DirectoryWithQuotaFeature.DEFAULT_DISKSPACE_QUOTA);
     final INodeDirectorySnapshottable s = new INodeDirectorySnapshottable(r);
     s.setSnapshotQuota(0);
     return s;
@@ -107,7 +111,7 @@ public class FSDirectory implements Clos
   public final static String DOT_INODES_STRING = ".inodes";
   public final static byte[] DOT_INODES = 
       DFSUtil.string2Bytes(DOT_INODES_STRING);
-  INodeDirectoryWithQuota rootDir;
+  INodeDirectory rootDir;
   FSImage fsImage;  
   private final FSNamesystem namesystem;
   private volatile boolean ready = false;
@@ -202,7 +206,7 @@ public class FSDirectory implements Clos
   }
 
   /** @return the root directory inode. */
-  public INodeDirectoryWithQuota getRoot() {
+  public INodeDirectory getRoot() {
     return rootDir;
   }
 
@@ -452,8 +456,8 @@ public class FSDirectory implements Clos
   
   boolean unprotectedRemoveBlock(String path,
       INodeFile fileNode, Block block) throws IOException {
-    Preconditions.checkArgument(fileNode.isUnderConstruction());
     // modify file-> block and blocksMap
+    // fileNode should be under construction
     boolean removed = fileNode.removeLastBlock(block);
     if (!removed) {
       return false;
@@ -1800,9 +1804,8 @@ public class FSDirectory implements Clos
     final INode[] inodes = inodesInPath.getINodes();
     for(int i=0; i < numOfINodes; i++) {
       if (inodes[i].isQuotaSet()) { // a directory with quota
-        INodeDirectoryWithQuota node = (INodeDirectoryWithQuota) inodes[i]
-            .asDirectory(); 
-        node.addSpaceConsumed2Cache(nsDelta, dsDelta);
+        inodes[i].asDirectory().getDirectoryWithQuotaFeature()
+            .addSpaceConsumed2Cache(nsDelta, dsDelta);
       }
     }
   }
@@ -2035,10 +2038,11 @@ public class FSDirectory implements Clos
         // Stop checking for quota when common ancestor is reached
         return;
       }
-      if (inodes[i].isQuotaSet()) { // a directory with quota
+      final DirectoryWithQuotaFeature q
+          = inodes[i].asDirectory().getDirectoryWithQuotaFeature();
+      if (q != null) { // a directory with quota
         try {
-          ((INodeDirectoryWithQuota) inodes[i].asDirectory()).verifyQuota(
-              nsDelta, dsDelta);
+          q.verifyQuota(nsDelta, dsDelta);
         } catch (QuotaExceededException e) {
           e.setPathName(getFullPathName(inodes, i));
           throw e;
@@ -2385,35 +2389,14 @@ public class FSDirectory implements Clos
       if (dsQuota == HdfsConstants.QUOTA_DONT_SET) {
         dsQuota = oldDsQuota;
       }        
+      if (oldNsQuota == nsQuota && oldDsQuota == dsQuota) {
+        return null;
+      }
 
       final Snapshot latest = iip.getLatestSnapshot();
-      if (dirNode instanceof INodeDirectoryWithQuota) {
-        INodeDirectoryWithQuota quotaNode = (INodeDirectoryWithQuota) dirNode;
-        Quota.Counts counts = null;
-        if (!quotaNode.isQuotaSet()) {
-          // dirNode must be an INodeDirectoryWithSnapshot whose quota has not
-          // been set yet
-          counts = quotaNode.computeQuotaUsage();
-        }
-        // a directory with quota; so set the quota to the new value
-        quotaNode.setQuota(nsQuota, dsQuota);
-        if (quotaNode.isQuotaSet() && counts != null) {
-          quotaNode.setSpaceConsumed(counts.get(Quota.NAMESPACE),
-              counts.get(Quota.DISKSPACE));
-        } else if (!quotaNode.isQuotaSet() && latest == null) {
-          // do not replace the node if the node is a snapshottable directory
-          // without snapshots
-          if (!(quotaNode instanceof INodeDirectoryWithSnapshot)) {
-            // will not come here for root because root is snapshottable and
-            // root's nsQuota is always set
-            return quotaNode.replaceSelf4INodeDirectory(inodeMap);
-          }
-        }
-      } else {
-        // a non-quota directory; so replace it with a directory with quota
-        return dirNode.replaceSelf4Quota(latest, nsQuota, dsQuota, inodeMap);
-      }
-      return (oldNsQuota != nsQuota || oldDsQuota != dsQuota) ? dirNode : null;
+      dirNode = dirNode.recordModification(latest, inodeMap);
+      dirNode.setQuota(nsQuota, dsQuota);
+      return dirNode;
     }
   }
   
@@ -2442,7 +2425,8 @@ public class FSDirectory implements Clos
   long totalInodes() {
     readLock();
     try {
-      return rootDir.numItemsInTree();
+      return rootDir.getDirectoryWithQuotaFeature().getSpaceConsumed()
+          .get(Quota.NAMESPACE);
     } finally {
       readUnlock();
     }

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Mon Dec  2 17:41:44 2013
@@ -953,7 +953,11 @@ public class FSEditLog implements LogsPu
         .setSnapshotRoot(path);
     logEdit(op);
   }
-  
+
+  /**
+   * Log a CacheDirectiveInfo returned from
+   * {@link CacheManager#addDirective(CacheDirectiveInfo, FSPermissionChecker)}
+   */
   void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
       boolean toLogRpcIds) {
     AddCacheDirectiveInfoOp op =

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Mon Dec  2 17:41:44 2013
@@ -636,17 +636,17 @@ public class FSEditLogLoader {
       fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
       break;
     }
-    case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
+    case OP_ADD_CACHE_DIRECTIVE: {
       AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
       CacheDirectiveInfo result = fsNamesys.
-          getCacheManager().addDirective(addOp.directive, null);
+          getCacheManager().addDirectiveFromEditLog(addOp.directive);
       if (toAddRetryCache) {
         Long id = result.getId();
         fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id);
       }
       break;
     }
-    case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
+    case OP_MODIFY_CACHE_DIRECTIVE: {
       ModifyCacheDirectiveInfoOp modifyOp =
           (ModifyCacheDirectiveInfoOp) op;
       fsNamesys.getCacheManager().modifyDirective(
@@ -656,7 +656,7 @@ public class FSEditLogLoader {
       }
       break;
     }
-    case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
+    case OP_REMOVE_CACHE_DIRECTIVE: {
       RemoveCacheDirectiveInfoOp removeOp =
           (RemoveCacheDirectiveInfoOp) op;
       fsNamesys.getCacheManager().removeDirective(removeOp.id, null);

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Mon Dec  2 17:41:44 2013
@@ -18,9 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@@ -35,10 +34,11 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -64,6 +64,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.zip.CheckedInputStream;
@@ -81,12 +82,12 @@ import org.apache.hadoop.fs.permission.P
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@@ -109,7 +110,6 @@ import org.xml.sax.helpers.AttributesImp
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 
 /**
  * Helper classes for reading the ops from an InputStream.
@@ -165,11 +165,11 @@ public abstract class FSEditLogOp {
       inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
       inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
       inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
-      inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
+      inst.put(OP_ADD_CACHE_DIRECTIVE,
           new AddCacheDirectiveInfoOp());
-      inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
+      inst.put(OP_MODIFY_CACHE_DIRECTIVE,
           new ModifyCacheDirectiveInfoOp());
-      inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
+      inst.put(OP_REMOVE_CACHE_DIRECTIVE,
           new RemoveCacheDirectiveInfoOp());
       inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
       inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
@@ -2874,12 +2874,12 @@ public abstract class FSEditLogOp {
     CacheDirectiveInfo directive;
 
     public AddCacheDirectiveInfoOp() {
-      super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+      super(OP_ADD_CACHE_DIRECTIVE);
     }
 
     static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
       return (AddCacheDirectiveInfoOp) cache
-          .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+          .get(OP_ADD_CACHE_DIRECTIVE);
     }
 
     public AddCacheDirectiveInfoOp setDirective(
@@ -2889,6 +2889,7 @@ public abstract class FSEditLogOp {
       assert(directive.getPath() != null);
       assert(directive.getReplication() != null);
       assert(directive.getPool() != null);
+      assert(directive.getExpiration() != null);
       return this;
     }
 
@@ -2898,11 +2899,13 @@ public abstract class FSEditLogOp {
       String path = FSImageSerialization.readString(in);
       short replication = FSImageSerialization.readShort(in);
       String pool = FSImageSerialization.readString(in);
+      long expiryTime = FSImageSerialization.readLong(in);
       directive = new CacheDirectiveInfo.Builder().
           setId(id).
           setPath(new Path(path)).
           setReplication(replication).
           setPool(pool).
+          setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
           build();
       readRpcIds(in, logVersion);
     }
@@ -2913,6 +2916,8 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out);
       FSImageSerialization.writeShort(directive.getReplication(), out);
       FSImageSerialization.writeString(directive.getPool(), out);
+      FSImageSerialization.writeLong(
+          directive.getExpiration().getMillis(), out);
       writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
@@ -2925,6 +2930,8 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "REPLICATION",
           Short.toString(directive.getReplication()));
       XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
+      XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+          "" + directive.getExpiration().getMillis());
       appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
@@ -2935,6 +2942,8 @@ public abstract class FSEditLogOp {
           setPath(new Path(st.getValue("PATH"))).
           setReplication(Short.parseShort(st.getValue("REPLICATION"))).
           setPool(st.getValue("POOL")).
+          setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+              Long.parseLong(st.getValue("EXPIRATION")))).
           build();
       readRpcIdsFromXml(st);
     }
@@ -2946,7 +2955,8 @@ public abstract class FSEditLogOp {
       builder.append("id=" + directive.getId() + ",");
       builder.append("path=" + directive.getPath().toUri().getPath() + ",");
       builder.append("replication=" + directive.getReplication() + ",");
-      builder.append("pool=" + directive.getPool());
+      builder.append("pool=" + directive.getPool() + ",");
+      builder.append("expiration=" + directive.getExpiration().getMillis());
       appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
@@ -2961,12 +2971,12 @@ public abstract class FSEditLogOp {
     CacheDirectiveInfo directive;
 
     public ModifyCacheDirectiveInfoOp() {
-      super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+      super(OP_MODIFY_CACHE_DIRECTIVE);
     }
 
     static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
       return (ModifyCacheDirectiveInfoOp) cache
-          .get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+          .get(OP_MODIFY_CACHE_DIRECTIVE);
     }
 
     public ModifyCacheDirectiveInfoOp setDirective(
@@ -2991,7 +3001,12 @@ public abstract class FSEditLogOp {
       if ((flags & 0x4) != 0) {
         builder.setPool(FSImageSerialization.readString(in));
       }
-      if ((flags & ~0x7) != 0) {
+      if ((flags & 0x8) != 0) {
+        builder.setExpiration(
+            CacheDirectiveInfo.Expiration.newAbsolute(
+                FSImageSerialization.readLong(in)));
+      }
+      if ((flags & ~0xF) != 0) {
         throw new IOException("unknown flags set in " +
             "ModifyCacheDirectiveInfoOp: " + flags);
       }
@@ -3005,7 +3020,8 @@ public abstract class FSEditLogOp {
       byte flags = (byte)(
           ((directive.getPath() != null) ? 0x1 : 0) |
           ((directive.getReplication() != null) ? 0x2 : 0) |
-          ((directive.getPool() != null) ? 0x4 : 0)
+          ((directive.getPool() != null) ? 0x4 : 0) |
+          ((directive.getExpiration() != null) ? 0x8 : 0)
         );
       out.writeByte(flags);
       if (directive.getPath() != null) {
@@ -3018,6 +3034,10 @@ public abstract class FSEditLogOp {
       if (directive.getPool() != null) {
         FSImageSerialization.writeString(directive.getPool(), out);
       }
+      if (directive.getExpiration() != null) {
+        FSImageSerialization.writeLong(directive.getExpiration().getMillis(),
+            out);
+      }
       writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
@@ -3036,6 +3056,10 @@ public abstract class FSEditLogOp {
       if (directive.getPool() != null) {
         XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
       }
+      if (directive.getExpiration() != null) {
+        XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+            "" + directive.getExpiration().getMillis());
+      }
       appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
@@ -3056,6 +3080,11 @@ public abstract class FSEditLogOp {
       if (pool != null) {
         builder.setPool(pool);
       }
+      String expiryTime = st.getValueOrNull("EXPIRATION");
+      if (expiryTime != null) {
+        builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+            Long.parseLong(expiryTime)));
+      }
       this.directive = builder.build();
       readRpcIdsFromXml(st);
     }
@@ -3075,6 +3104,10 @@ public abstract class FSEditLogOp {
       if (directive.getPool() != null) {
         builder.append(",").append("pool=").append(directive.getPool());
       }
+      if (directive.getExpiration() != null) {
+        builder.append(",").append("expiration=").
+            append(directive.getExpiration().getMillis());
+      }
       appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
@@ -3089,12 +3122,12 @@ public abstract class FSEditLogOp {
     long id;
 
     public RemoveCacheDirectiveInfoOp() {
-      super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+      super(OP_REMOVE_CACHE_DIRECTIVE);
     }
 
     static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
       return (RemoveCacheDirectiveInfoOp) cache
-          .get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+          .get(OP_REMOVE_CACHE_DIRECTIVE);
     }
 
     public RemoveCacheDirectiveInfoOp setId(long id) {
@@ -3162,7 +3195,7 @@ public abstract class FSEditLogOp {
 
     @Override
     public void writeFields(DataOutputStream out) throws IOException {
-      info .writeTo(out);
+      info.writeTo(out);
       writeRpcIds(rpcClientId, rpcCallId, out);
     }
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Mon Dec  2 17:41:44 2013
@@ -64,12 +64,12 @@ public enum FSEditLogOpCodes {
   OP_DISALLOW_SNAPSHOT          ((byte) 30),
   OP_SET_GENSTAMP_V2            ((byte) 31),
   OP_ALLOCATE_BLOCK_ID          ((byte) 32),
-  OP_ADD_PATH_BASED_CACHE_DIRECTIVE       ((byte) 33),
-  OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE    ((byte) 34),
+  OP_ADD_CACHE_DIRECTIVE       ((byte) 33),
+  OP_REMOVE_CACHE_DIRECTIVE    ((byte) 34),
   OP_ADD_CACHE_POOL                       ((byte) 35),
   OP_MODIFY_CACHE_POOL                    ((byte) 36),
   OP_REMOVE_CACHE_POOL                    ((byte) 37),
-  OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE    ((byte) 38);
+  OP_MODIFY_CACHE_DIRECTIVE    ((byte) 38);
 
   private byte opCode;
 

Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java?rev=1547122&r1=1547121&r2=1547122&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImage.java Mon Dec  2 17:41:44 2013
@@ -755,7 +755,7 @@ public class FSImage implements Closeabl
    * This is an update of existing state of the filesystem and does not
    * throw QuotaExceededException.
    */
-  static void updateCountForQuota(INodeDirectoryWithQuota root) {
+  static void updateCountForQuota(INodeDirectory root) {
     updateCountForQuotaRecursively(root, Quota.Counts.newInstance());
   }
   
@@ -795,7 +795,7 @@ public class FSImage implements Closeabl
             + " quota = " + dsQuota + " < consumed = " + diskspace);
       }
 
-      ((INodeDirectoryWithQuota)dir).setSpaceConsumed(namespace, diskspace);
+      dir.getDirectoryWithQuotaFeature().setSpaceConsumed(namespace, diskspace);
     }
   }