You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/11/28 08:20:22 UTC

svn commit: r1546301 [1/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/protocol/ src/main/java/org/apache/hadoop/hdfs/protocolPB/ src/main/java/org/apache/hado...

Author: wang
Date: Thu Nov 28 07:20:21 2013
New Revision: 1546301

URL: http://svn.apache.org/r1546301
Log:
HDFS-5430. Support TTL on CacheDirectives. Contributed by Andrew Wang.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 28 07:20:21 2013
@@ -226,6 +226,8 @@ Trunk (Unreleased)
 
     HDFS-5537. Remove FileWithSnapshot interface.  (jing9 via szetszwo)
 
+    HDFS-5430. Support TTL on CacheDirectives. (wang)
+
   OPTIMIZATIONS
     HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Thu Nov 28 07:20:21 2013
@@ -38,12 +38,15 @@ import java.net.InetSocketAddress;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.security.SecureRandom;
+import java.text.SimpleDateFormat;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Locale;
 import java.util.Map;
 import java.util.Random;
 import java.util.Set;
@@ -1426,4 +1429,64 @@ public class DFSUtil {
             sslConf.get("ssl.server.truststore.password"),
             sslConf.get("ssl.server.truststore.type", "jks"));
   }
-}
\ No newline at end of file
+
+  /**
+   * Converts a Date into an ISO-8601 formatted datetime string.
+   */
+  public static String dateToIso8601String(Date date) {
+    SimpleDateFormat df =
+        new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
+    return df.format(date);
+  }
+
+  /**
+   * Converts a time duration in milliseconds into DDD:HH:MM:SS format.
+   */
+  public static String durationToString(long durationMs) {
+    Preconditions.checkArgument(durationMs >= 0, "Invalid negative duration");
+    // Chop off the milliseconds
+    long durationSec = durationMs / 1000;
+    final int secondsPerMinute = 60;
+    final int secondsPerHour = 60*60;
+    final int secondsPerDay = 60*60*24;
+    final long days = durationSec / secondsPerDay;
+    durationSec -= days * secondsPerDay;
+    final long hours = durationSec / secondsPerHour;
+    durationSec -= hours * secondsPerHour;
+    final long minutes = durationSec / secondsPerMinute;
+    durationSec -= minutes * secondsPerMinute;
+    final long seconds = durationSec;
+    return String.format("%03d:%02d:%02d:%02d", days, hours, minutes, seconds);
+  }
+
+  /**
+   * Converts a relative time string into a duration in milliseconds.
+   */
+  public static long parseRelativeTime(String relTime) throws IOException {
+    if (relTime.length() < 2) {
+      throw new IOException("Unable to parse relative time value of " + relTime
+          + ": too short");
+    }
+    String ttlString = relTime.substring(0, relTime.length()-1);
+    int ttl;
+    try {
+      ttl = Integer.parseInt(ttlString);
+    } catch (NumberFormatException e) {
+      throw new IOException("Unable to parse relative time value of " + relTime
+          + ": " + ttlString + " is not a number");
+    }
+    if (relTime.endsWith("s")) {
+      // pass
+    } else if (relTime.endsWith("m")) {
+      ttl *= 60;
+    } else if (relTime.endsWith("h")) {
+      ttl *= 60*60;
+    } else if (relTime.endsWith("d")) {
+      ttl *= 60*60*24;
+    } else {
+      throw new IOException("Unable to parse relative time value of " + relTime
+          + ": unknown time unit " + relTime.charAt(relTime.length() - 1));
+    }
+    return ttl*1000;
+  }
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java Thu Nov 28 07:20:21 2013
@@ -17,9 +17,14 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Date;
+
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.server.namenode.CachePool;
 import org.apache.hadoop.util.IntrusiveCollection;
 import org.apache.hadoop.util.IntrusiveCollection.Element;
@@ -27,7 +32,7 @@ import org.apache.hadoop.util.IntrusiveC
 import com.google.common.base.Preconditions;
 
 /**
- * Represents an entry in the PathBasedCache on the NameNode.
+ * Namenode class that tracks state related to a cached path.
  *
  * This is an implementation class, not part of the public API.
  */
@@ -37,6 +42,8 @@ public final class CacheDirective implem
   private final String path;
   private final short replication;
   private CachePool pool;
+  private final long expiryTime;
+
   private long bytesNeeded;
   private long bytesCached;
   private long filesAffected;
@@ -44,13 +51,13 @@ public final class CacheDirective implem
   private Element next;
 
   public CacheDirective(long id, String path,
-      short replication) {
+      short replication, long expiryTime) {
     Preconditions.checkArgument(id > 0);
     this.id = id;
+    this.path = checkNotNull(path);
     Preconditions.checkArgument(replication > 0);
-    this.path = path;
     this.replication = replication;
-    Preconditions.checkNotNull(path);
+    this.expiryTime = expiryTime;
     this.bytesNeeded = 0;
     this.bytesCached = 0;
     this.filesAffected = 0;
@@ -64,20 +71,40 @@ public final class CacheDirective implem
     return path;
   }
 
-  public CachePool getPool() {
-    return pool;
-  }
-
   public short getReplication() {
     return replication;
   }
 
+  public CachePool getPool() {
+    return pool;
+  }
+
+  /**
+   * @return When this directive expires, in milliseconds since Unix epoch
+   */
+  public long getExpiryTime() {
+    return expiryTime;
+  }
+
+  /**
+   * @return When this directive expires, as an ISO-8601 formatted string.
+   */
+  public String getExpiryTimeString() {
+    return DFSUtil.dateToIso8601String(new Date(expiryTime));
+  }
+
+  /**
+   * Returns a {@link CacheDirectiveInfo} based on this CacheDirective.
+   * <p>
+   * This always sets an absolute expiry time, never a relative TTL.
+   */
   public CacheDirectiveInfo toInfo() {
     return new CacheDirectiveInfo.Builder().
         setId(id).
         setPath(new Path(path)).
         setReplication(replication).
         setPool(pool.getPoolName()).
+        setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
         build();
   }
 
@@ -86,6 +113,7 @@ public final class CacheDirective implem
         setBytesNeeded(bytesNeeded).
         setBytesCached(bytesCached).
         setFilesAffected(filesAffected).
+        setHasExpired(new Date().getTime() > expiryTime).
         build();
   }
 
@@ -100,6 +128,7 @@ public final class CacheDirective implem
       append(", path:").append(path).
       append(", replication:").append(replication).
       append(", pool:").append(pool).
+      append(", expiryTime: ").append(getExpiryTimeString()).
       append(", bytesNeeded:").append(bytesNeeded).
       append(", bytesCached:").append(bytesCached).
       append(", filesAffected:").append(filesAffected).

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java Thu Nov 28 07:20:21 2013
@@ -17,11 +17,14 @@
  */
 package org.apache.hadoop.hdfs.protocol;
 
+import java.util.Date;
+
 import org.apache.commons.lang.builder.EqualsBuilder;
 import org.apache.commons.lang.builder.HashCodeBuilder;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
 
 /**
  * Describes a path-based cache directive.
@@ -37,6 +40,7 @@ public class CacheDirectiveInfo {
     private Path path;
     private Short replication;
     private String pool;
+    private Expiration expiration;
 
     /**
      * Builds a new CacheDirectiveInfo populated with the set properties.
@@ -44,7 +48,7 @@ public class CacheDirectiveInfo {
      * @return New CacheDirectiveInfo.
      */
     public CacheDirectiveInfo build() {
-      return new CacheDirectiveInfo(id, path, replication, pool);
+      return new CacheDirectiveInfo(id, path, replication, pool, expiration);
     }
 
     /**
@@ -62,6 +66,7 @@ public class CacheDirectiveInfo {
       this.path = directive.getPath();
       this.replication = directive.getReplication();
       this.pool = directive.getPool();
+      this.expiration = directive.getExpiration();
     }
 
     /**
@@ -107,18 +112,134 @@ public class CacheDirectiveInfo {
       this.pool = pool;
       return this;
     }
+
+    /**
+     * Sets when the CacheDirective should expire. A
+     * {@link CacheDirectiveInfo.Expiration} can specify either an absolute or
+     * relative expiration time.
+     * 
+     * @param expiration when this CacheDirective should expire
+     * @return This builder, for call chaining
+     */
+    public Builder setExpiration(Expiration expiration) {
+      this.expiration = expiration;
+      return this;
+    }
+  }
+
+  /**
+   * Denotes a relative or absolute expiration time for a CacheDirective. Use
+   * factory methods {@link CacheDirectiveInfo.Expiration#newAbsolute(Date)} and
+   * {@link CacheDirectiveInfo.Expiration#newRelative(long)} to create an
+   * Expiration.
+   * <p>
+   * In either case, the server-side clock is used to determine when a
+   * CacheDirective expires.
+   */
+  public static class Expiration {
+
+    /** Denotes a CacheDirectiveInfo that never expires **/
+    public static final int EXPIRY_NEVER = -1;
+
+    /**
+     * Create a new relative Expiration.
+     * 
+     * @param ms how long until the CacheDirective expires, in milliseconds
+     * @return A relative Expiration
+     */
+    public static Expiration newRelative(long ms) {
+      return new Expiration(ms, true);
+    }
+
+    /**
+     * Create a new absolute Expiration.
+     * 
+     * @param date when the CacheDirective expires
+     * @return An absolute Expiration
+     */
+    public static Expiration newAbsolute(Date date) {
+      return new Expiration(date.getTime(), false);
+    }
+
+    /**
+     * Create a new absolute Expiration.
+     * 
+     * @param ms when the CacheDirective expires, in milliseconds since the Unix
+     *          epoch.
+     * @return An absolute Expiration
+     */
+    public static Expiration newAbsolute(long ms) {
+      return new Expiration(ms, false);
+    }
+
+    private final long ms;
+    private final boolean isRelative;
+
+    private Expiration(long ms, boolean isRelative) {
+      this.ms = ms;
+      this.isRelative = isRelative;
+    }
+
+    /**
+     * @return true if Expiration was specified as a relative duration, false if
+     *         specified as an absolute time.
+     */
+    public boolean isRelative() {
+      return isRelative;
+    }
+
+    /**
+     * @return The raw underlying millisecond value, either a relative duration
+     *         or an absolute time as milliseconds since the Unix epoch.
+     */
+    public long getMillis() {
+      return ms;
+    }
+
+    /**
+     * @return Expiration time as a {@link Date} object. This converts a
+     *         relative Expiration into an absolute Date based on the local
+     *         clock.
+     */
+    public Date getAbsoluteDate() {
+      return new Date(getAbsoluteMillis());
+    }
+
+    /**
+     * @return Expiration time in milliseconds from the Unix epoch. This
+     *         converts a relative Expiration into an absolute time based on the
+     *         local clock.
+     */
+    public long getAbsoluteMillis() {
+      if (!isRelative) {
+        return ms;
+      } else {
+        return new Date().getTime() + ms;
+      }
+    }
+
+    @Override
+    public String toString() {
+      if (isRelative) {
+        return DFSUtil.durationToString(ms);
+      }
+      return DFSUtil.dateToIso8601String(new Date(ms));
+    }
   }
 
   private final Long id;
   private final Path path;
   private final Short replication;
   private final String pool;
+  private final Expiration expiration;
 
-  CacheDirectiveInfo(Long id, Path path, Short replication, String pool) {
+  CacheDirectiveInfo(Long id, Path path, Short replication, String pool,
+      Expiration expiration) {
     this.id = id;
     this.path = path;
     this.replication = replication;
     this.pool = pool;
+    this.expiration = expiration;
   }
 
   /**
@@ -148,7 +269,14 @@ public class CacheDirectiveInfo {
   public String getPool() {
     return pool;
   }
-  
+
+  /**
+   * @return When this directive expires.
+   */
+  public Expiration getExpiration() {
+    return expiration;
+  }
+
   @Override
   public boolean equals(Object o) {
     if (o == null) {
@@ -162,6 +290,7 @@ public class CacheDirectiveInfo {
         append(getPath(), other.getPath()).
         append(getReplication(), other.getReplication()).
         append(getPool(), other.getPool()).
+        append(getExpiration(), other.getExpiration()).
         isEquals();
   }
 
@@ -171,6 +300,7 @@ public class CacheDirectiveInfo {
         append(path).
         append(replication).
         append(pool).
+        append(expiration).
         hashCode();
   }
 
@@ -181,19 +311,23 @@ public class CacheDirectiveInfo {
     String prefix = "";
     if (id != null) {
       builder.append(prefix).append("id: ").append(id);
-      prefix = ",";
+      prefix = ", ";
     }
     if (path != null) {
       builder.append(prefix).append("path: ").append(path);
-      prefix = ",";
+      prefix = ", ";
     }
     if (replication != null) {
       builder.append(prefix).append("replication: ").append(replication);
-      prefix = ",";
+      prefix = ", ";
     }
     if (pool != null) {
       builder.append(prefix).append("pool: ").append(pool);
-      prefix = ",";
+      prefix = ", ";
+    }
+    if (expiration != null) {
+      builder.append(prefix).append("expiration: ").append(expiration);
+      prefix = ", ";
     }
     builder.append("}");
     return builder.toString();

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java Thu Nov 28 07:20:21 2013
@@ -30,6 +30,7 @@ public class CacheDirectiveStats {
     private long bytesNeeded;
     private long bytesCached;
     private long filesAffected;
+    private boolean hasExpired;
 
     /**
      * Builds a new CacheDirectiveStats populated with the set properties.
@@ -37,7 +38,8 @@ public class CacheDirectiveStats {
      * @return New CacheDirectiveStats.
      */
     public CacheDirectiveStats build() {
-      return new CacheDirectiveStats(bytesNeeded, bytesCached, filesAffected);
+      return new CacheDirectiveStats(bytesNeeded, bytesCached, filesAffected,
+          hasExpired);
     }
 
     /**
@@ -52,7 +54,7 @@ public class CacheDirectiveStats {
      * @param bytesNeeded The bytes needed.
      * @return This builder, for call chaining.
      */
-    public Builder setBytesNeeded(Long bytesNeeded) {
+    public Builder setBytesNeeded(long bytesNeeded) {
       this.bytesNeeded = bytesNeeded;
       return this;
     }
@@ -63,7 +65,7 @@ public class CacheDirectiveStats {
      * @param bytesCached The bytes cached.
      * @return This builder, for call chaining.
      */
-    public Builder setBytesCached(Long bytesCached) {
+    public Builder setBytesCached(long bytesCached) {
       this.bytesCached = bytesCached;
       return this;
     }
@@ -74,21 +76,34 @@ public class CacheDirectiveStats {
      * @param filesAffected The files affected.
      * @return This builder, for call chaining.
      */
-    public Builder setFilesAffected(Long filesAffected) {
+    public Builder setFilesAffected(long filesAffected) {
       this.filesAffected = filesAffected;
       return this;
     }
+
+    /**
+     * Sets whether this directive has expired.
+     * 
+     * @param hasExpired if this directive has expired
+     * @return This builder, for call chaining.
+     */
+    public Builder setHasExpired(boolean hasExpired) {
+      this.hasExpired = hasExpired;
+      return this;
+    }
   }
 
   private final long bytesNeeded;
   private final long bytesCached;
   private final long filesAffected;
+  private final boolean hasExpired;
 
   private CacheDirectiveStats(long bytesNeeded, long bytesCached,
-      long filesAffected) {
+      long filesAffected, boolean hasExpired) {
     this.bytesNeeded = bytesNeeded;
     this.bytesCached = bytesCached;
     this.filesAffected = filesAffected;
+    this.hasExpired = hasExpired;
   }
 
   /**
@@ -112,6 +127,13 @@ public class CacheDirectiveStats {
     return filesAffected;
   }
 
+  /**
+   * @return Whether this directive has expired.
+   */
+  public boolean hasExpired() {
+    return hasExpired;
+  }
+
   @Override
   public String toString() {
     StringBuilder builder = new StringBuilder();
@@ -119,6 +141,7 @@ public class CacheDirectiveStats {
     builder.append("bytesNeeded: ").append(bytesNeeded);
     builder.append(", ").append("bytesCached: ").append(bytesCached);
     builder.append(", ").append("filesAffected: ").append(filesAffected);
+    builder.append(", ").append("hasExpired: ").append(hasExpired);
     builder.append("}");
     return builder.toString();
   }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Thu Nov 28 07:20:21 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.L
 import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
 import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
@@ -1591,6 +1592,9 @@ public class PBHelper {
     if (info.getPool() != null) {
       builder.setPool(info.getPool());
     }
+    if (info.getExpiration() != null) {
+      builder.setExpiration(convert(info.getExpiration()));
+    }
     return builder.build();
   }
 
@@ -1611,15 +1615,35 @@ public class PBHelper {
     if (proto.hasPool()) {
       builder.setPool(proto.getPool());
     }
+    if (proto.hasExpiration()) {
+      builder.setExpiration(convert(proto.getExpiration()));
+    }
     return builder.build();
   }
-  
+
+  public static CacheDirectiveInfoExpirationProto convert(
+      CacheDirectiveInfo.Expiration expiration) {
+    return CacheDirectiveInfoExpirationProto.newBuilder()
+        .setIsRelative(expiration.isRelative())
+        .setMillis(expiration.getMillis())
+        .build();
+  }
+
+  public static CacheDirectiveInfo.Expiration convert(
+      CacheDirectiveInfoExpirationProto proto) {
+    if (proto.getIsRelative()) {
+      return CacheDirectiveInfo.Expiration.newRelative(proto.getMillis());
+    }
+    return CacheDirectiveInfo.Expiration.newAbsolute(proto.getMillis());
+  }
+
   public static CacheDirectiveStatsProto convert(CacheDirectiveStats stats) {
     CacheDirectiveStatsProto.Builder builder = 
         CacheDirectiveStatsProto.newBuilder();
     builder.setBytesNeeded(stats.getBytesNeeded());
     builder.setBytesCached(stats.getBytesCached());
     builder.setFilesAffected(stats.getFilesAffected());
+    builder.setHasExpired(stats.hasExpired());
     return builder.build();
   }
   
@@ -1628,6 +1652,7 @@ public class PBHelper {
     builder.setBytesNeeded(proto.getBytesNeeded());
     builder.setBytesCached(proto.getBytesCached());
     builder.setFilesAffected(proto.getFilesAffected());
+    builder.setHasExpired(proto.getHasExpired());
     return builder.build();
   }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Thu Nov 28 07:20:21 2013
@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Exi
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.Collection;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
@@ -211,12 +212,24 @@ public class CacheReplicationMonitor ext
    */
   private void rescanCacheDirectives() {
     FSDirectory fsDir = namesystem.getFSDirectory();
-    for (CacheDirective pce : cacheManager.getEntriesById().values()) {
+    final long now = new Date().getTime();
+    for (CacheDirective directive : cacheManager.getEntriesById().values()) {
+      // Reset the directive
+      directive.clearBytesNeeded();
+      directive.clearBytesCached();
+      directive.clearFilesAffected();
+      // Skip processing this entry if it has expired
+      LOG.info("Directive expiry is at " + directive.getExpiryTime());
+      if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skipping directive id " + directive.getId()
+              + " because it has expired (" + directive.getExpiryTime() + ">="
+              + now);
+        }
+        continue;
+      }
       scannedDirectives++;
-      pce.clearBytesNeeded();
-      pce.clearBytesCached();
-      pce.clearFilesAffected();
-      String path = pce.getPath();
+      String path = directive.getPath();
       INode node;
       try {
         node = fsDir.getINode(path);
@@ -233,11 +246,11 @@ public class CacheReplicationMonitor ext
         ReadOnlyList<INode> children = dir.getChildrenList(null);
         for (INode child : children) {
           if (child.isFile()) {
-            rescanFile(pce, child.asFile());
+            rescanFile(directive, child.asFile());
           }
         }
       } else if (node.isFile()) {
-        rescanFile(pce, node.asFile());
+        rescanFile(directive, node.asFile());
       } else {
         if (LOG.isDebugEnabled()) {
           LOG.debug("Ignoring non-directory, non-file inode " + node +

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Thu Nov 28 07:20:21 2013
@@ -17,12 +17,12 @@
  */
 package org.apache.hadoop.hdfs.server.namenode;
 
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
 
@@ -43,18 +43,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.InvalidRequestException;
 import org.apache.hadoop.fs.permission.FsAction;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirective;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
@@ -249,7 +249,7 @@ public final class CacheManager {
     return cachedBlocks;
   }
 
-  private long getNextEntryId() throws IOException {
+  private long getNextDirectiveId() throws IOException {
     assert namesystem.hasWriteLock();
     if (nextDirectiveId >= Long.MAX_VALUE - 1) {
       throw new IOException("No more available IDs.");
@@ -302,6 +302,34 @@ public final class CacheManager {
   }
 
   /**
+   * Calculates the absolute expiry time of the directive from the
+   * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
+   * into an absolute time based on the local clock.
+   * 
+   * @param directive from which to get the expiry time
+   * @param defaultValue to use if Expiration is not set
+   * @return Absolute expiry time in milliseconds since Unix epoch
+   * @throws InvalidRequestException if the Expiration is invalid
+   */
+  private static long validateExpiryTime(CacheDirectiveInfo directive,
+      long defaultValue) throws InvalidRequestException {
+    long expiryTime;
+    CacheDirectiveInfo.Expiration expiration = directive.getExpiration();
+    if (expiration != null) {
+      if (expiration.getMillis() < 0) {
+        throw new InvalidRequestException("Cannot set a negative expiration: "
+            + expiration.getMillis());
+      }
+      // Converts a relative duration into an absolute time based on the local
+      // clock
+      expiryTime = expiration.getAbsoluteMillis();
+    } else {
+      expiryTime = defaultValue;
+    }
+    return expiryTime;
+  }
+
+  /**
    * Get a CacheDirective by ID, validating the ID and that the directive
    * exists.
    */
@@ -346,6 +374,26 @@ public final class CacheManager {
     directives.add(directive);
   }
 
+  /**
+   * To be called only from the edit log loading code
+   */
+  CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
+      throws InvalidRequestException {
+    long id = directive.getId();
+    CacheDirective entry =
+        new CacheDirective(
+            directive.getId(),
+            directive.getPath().toUri().getPath(),
+            directive.getReplication(),
+            directive.getExpiration().getAbsoluteMillis());
+    CachePool pool = cachePools.get(directive.getPool());
+    addInternal(entry, pool);
+    if (nextDirectiveId <= id) {
+      nextDirectiveId = id + 1;
+    }
+    return entry.toInfo();
+  }
+
   public CacheDirectiveInfo addDirective(
       CacheDirectiveInfo info, FSPermissionChecker pc)
       throws IOException {
@@ -356,27 +404,12 @@ public final class CacheManager {
       checkWritePermission(pc, pool);
       String path = validatePath(info);
       short replication = validateReplication(info, (short)1);
-      long id;
-      if (info.getId() != null) {
-        // We are loading a directive from the edit log.
-        // Use the ID from the edit log.
-        id = info.getId();
-        if (id <= 0) {
-          throw new InvalidRequestException("can't add an ID " +
-              "of " + id + ": it is not positive.");
-        }
-        if (id >= Long.MAX_VALUE) {
-          throw new InvalidRequestException("can't add an ID " +
-              "of " + id + ": it is too big.");
-        }
-        if (nextDirectiveId <= id) {
-          nextDirectiveId = id + 1;
-        }
-      } else {
-        // Add a new directive with the next available ID.
-        id = getNextEntryId();
-      }
-      directive = new CacheDirective(id, path, replication);
+      long expiryTime = validateExpiryTime(info,
+          CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
+      // All validation passed
+      // Add a new entry with the next available ID.
+      long id = getNextDirectiveId();
+      directive = new CacheDirective(id, path, replication, expiryTime);
       addInternal(directive, pool);
     } catch (IOException e) {
       LOG.warn("addDirective of " + info + " failed: ", e);
@@ -407,10 +440,13 @@ public final class CacheManager {
       if (info.getPath() != null) {
         path = validatePath(info);
       }
+
       short replication = prevEntry.getReplication();
-      if (info.getReplication() != null) {
-        replication = validateReplication(info, replication);
-      }
+      replication = validateReplication(info, replication);
+
+      long expiryTime = prevEntry.getExpiryTime();
+      expiryTime = validateExpiryTime(info, expiryTime);
+
       CachePool pool = prevEntry.getPool();
       if (info.getPool() != null) {
         pool = getCachePool(validatePoolName(info));
@@ -418,7 +454,7 @@ public final class CacheManager {
       }
       removeInternal(prevEntry);
       CacheDirective newEntry =
-          new CacheDirective(id, path, replication);
+          new CacheDirective(id, path, replication, expiryTime);
       addInternal(newEntry, pool);
     } catch (IOException e) {
       LOG.warn("modifyDirective of " + idString + " failed: ", e);
@@ -788,6 +824,7 @@ public final class CacheManager {
       Text.writeString(out, directive.getPath());
       out.writeShort(directive.getReplication());
       Text.writeString(out, directive.getPool().getPoolName());
+      out.writeLong(directive.getExpiryTime());
       counter.increment();
     }
     prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -826,6 +863,7 @@ public final class CacheManager {
       String path = Text.readString(in);
       short replication = in.readShort();
       String poolName = Text.readString(in);
+      long expiryTime = in.readLong();
       // Get pool reference by looking it up in the map
       CachePool pool = cachePools.get(poolName);
       if (pool == null) {
@@ -833,7 +871,7 @@ public final class CacheManager {
             ", which does not exist.");
       }
       CacheDirective directive =
-          new CacheDirective(directiveId, path, replication);
+          new CacheDirective(directiveId, path, replication, expiryTime);
       boolean addedDirective = pool.getDirectiveList().add(directive);
       assert addedDirective;
       if (directivesById.put(directive.getId(), directive) != null) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Nov 28 07:20:21 2013
@@ -953,7 +953,11 @@ public class FSEditLog implements LogsPu
         .setSnapshotRoot(path);
     logEdit(op);
   }
-  
+
+  /**
+   * Log a CacheDirectiveInfo returned from
+   * {@link CacheManager#addDirective(CacheDirectiveInfo, FSPermissionChecker)}
+   */
   void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
       boolean toLogRpcIds) {
     AddCacheDirectiveInfoOp op =

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Nov 28 07:20:21 2013
@@ -636,17 +636,17 @@ public class FSEditLogLoader {
       fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
       break;
     }
-    case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
+    case OP_ADD_CACHE_DIRECTIVE: {
       AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
       CacheDirectiveInfo result = fsNamesys.
-          getCacheManager().addDirective(addOp.directive, null);
+          getCacheManager().addDirectiveFromEditLog(addOp.directive);
       if (toAddRetryCache) {
         Long id = result.getId();
         fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id);
       }
       break;
     }
-    case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
+    case OP_MODIFY_CACHE_DIRECTIVE: {
       ModifyCacheDirectiveInfoOp modifyOp =
           (ModifyCacheDirectiveInfoOp) op;
       fsNamesys.getCacheManager().modifyDirective(
@@ -656,7 +656,7 @@ public class FSEditLogLoader {
       }
       break;
     }
-    case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
+    case OP_REMOVE_CACHE_DIRECTIVE: {
       RemoveCacheDirectiveInfoOp removeOp =
           (RemoveCacheDirectiveInfoOp) op;
       fsNamesys.getCacheManager().removeDirective(removeOp.id, null);

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu Nov 28 07:20:21 2013
@@ -18,9 +18,8 @@
 package org.apache.hadoop.hdfs.server.namenode;
 
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@@ -35,10 +34,11 @@ import static org.apache.hadoop.hdfs.ser
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
 import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -64,6 +64,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Date;
 import java.util.EnumMap;
 import java.util.List;
 import java.util.zip.CheckedInputStream;
@@ -81,12 +82,12 @@ import org.apache.hadoop.fs.permission.P
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DeprecatedUTF8;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion;
 import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.util.XMLUtils;
 import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@@ -109,7 +110,6 @@ import org.xml.sax.helpers.AttributesImp
 
 import com.google.common.base.Joiner;
 import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
 
 /**
  * Helper classes for reading the ops from an InputStream.
@@ -165,11 +165,11 @@ public abstract class FSEditLogOp {
       inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
       inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
       inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
-      inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
+      inst.put(OP_ADD_CACHE_DIRECTIVE,
           new AddCacheDirectiveInfoOp());
-      inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
+      inst.put(OP_MODIFY_CACHE_DIRECTIVE,
           new ModifyCacheDirectiveInfoOp());
-      inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
+      inst.put(OP_REMOVE_CACHE_DIRECTIVE,
           new RemoveCacheDirectiveInfoOp());
       inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
       inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
@@ -2874,12 +2874,12 @@ public abstract class FSEditLogOp {
     CacheDirectiveInfo directive;
 
     public AddCacheDirectiveInfoOp() {
-      super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+      super(OP_ADD_CACHE_DIRECTIVE);
     }
 
     static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
       return (AddCacheDirectiveInfoOp) cache
-          .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+          .get(OP_ADD_CACHE_DIRECTIVE);
     }
 
     public AddCacheDirectiveInfoOp setDirective(
@@ -2889,6 +2889,7 @@ public abstract class FSEditLogOp {
       assert(directive.getPath() != null);
       assert(directive.getReplication() != null);
       assert(directive.getPool() != null);
+      assert(directive.getExpiration() != null);
       return this;
     }
 
@@ -2898,11 +2899,13 @@ public abstract class FSEditLogOp {
       String path = FSImageSerialization.readString(in);
       short replication = FSImageSerialization.readShort(in);
       String pool = FSImageSerialization.readString(in);
+      long expiryTime = FSImageSerialization.readLong(in);
       directive = new CacheDirectiveInfo.Builder().
           setId(id).
           setPath(new Path(path)).
           setReplication(replication).
           setPool(pool).
+          setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
           build();
       readRpcIds(in, logVersion);
     }
@@ -2913,6 +2916,8 @@ public abstract class FSEditLogOp {
       FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out);
       FSImageSerialization.writeShort(directive.getReplication(), out);
       FSImageSerialization.writeString(directive.getPool(), out);
+      FSImageSerialization.writeLong(
+          directive.getExpiration().getMillis(), out);
       writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
@@ -2925,6 +2930,8 @@ public abstract class FSEditLogOp {
       XMLUtils.addSaxString(contentHandler, "REPLICATION",
           Short.toString(directive.getReplication()));
       XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
+      XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+          "" + directive.getExpiration().getMillis());
       appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
@@ -2935,6 +2942,8 @@ public abstract class FSEditLogOp {
           setPath(new Path(st.getValue("PATH"))).
           setReplication(Short.parseShort(st.getValue("REPLICATION"))).
           setPool(st.getValue("POOL")).
+          setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+              Long.parseLong(st.getValue("EXPIRATION")))).
           build();
       readRpcIdsFromXml(st);
     }
@@ -2946,7 +2955,8 @@ public abstract class FSEditLogOp {
       builder.append("id=" + directive.getId() + ",");
       builder.append("path=" + directive.getPath().toUri().getPath() + ",");
       builder.append("replication=" + directive.getReplication() + ",");
-      builder.append("pool=" + directive.getPool());
+      builder.append("pool=" + directive.getPool() + ",");
+      builder.append("expiration=" + directive.getExpiration().getMillis());
       appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
@@ -2961,12 +2971,12 @@ public abstract class FSEditLogOp {
     CacheDirectiveInfo directive;
 
     public ModifyCacheDirectiveInfoOp() {
-      super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+      super(OP_MODIFY_CACHE_DIRECTIVE);
     }
 
     static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
       return (ModifyCacheDirectiveInfoOp) cache
-          .get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+          .get(OP_MODIFY_CACHE_DIRECTIVE);
     }
 
     public ModifyCacheDirectiveInfoOp setDirective(
@@ -2991,7 +3001,12 @@ public abstract class FSEditLogOp {
       if ((flags & 0x4) != 0) {
         builder.setPool(FSImageSerialization.readString(in));
       }
-      if ((flags & ~0x7) != 0) {
+      if ((flags & 0x8) != 0) {
+        builder.setExpiration(
+            CacheDirectiveInfo.Expiration.newAbsolute(
+                FSImageSerialization.readLong(in)));
+      }
+      if ((flags & ~0xF) != 0) {
         throw new IOException("unknown flags set in " +
             "ModifyCacheDirectiveInfoOp: " + flags);
       }
@@ -3005,7 +3020,8 @@ public abstract class FSEditLogOp {
       byte flags = (byte)(
           ((directive.getPath() != null) ? 0x1 : 0) |
           ((directive.getReplication() != null) ? 0x2 : 0) |
-          ((directive.getPool() != null) ? 0x4 : 0)
+          ((directive.getPool() != null) ? 0x4 : 0) |
+          ((directive.getExpiration() != null) ? 0x8 : 0)
         );
       out.writeByte(flags);
       if (directive.getPath() != null) {
@@ -3018,6 +3034,10 @@ public abstract class FSEditLogOp {
       if (directive.getPool() != null) {
         FSImageSerialization.writeString(directive.getPool(), out);
       }
+      if (directive.getExpiration() != null) {
+        FSImageSerialization.writeLong(directive.getExpiration().getMillis(),
+            out);
+      }
       writeRpcIds(rpcClientId, rpcCallId, out);
     }
 
@@ -3036,6 +3056,10 @@ public abstract class FSEditLogOp {
       if (directive.getPool() != null) {
         XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
       }
+      if (directive.getExpiration() != null) {
+        XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+            "" + directive.getExpiration().getMillis());
+      }
       appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
     }
 
@@ -3056,6 +3080,11 @@ public abstract class FSEditLogOp {
       if (pool != null) {
         builder.setPool(pool);
       }
+      String expiryTime = st.getValueOrNull("EXPIRATION");
+      if (expiryTime != null) {
+        builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+            Long.parseLong(expiryTime)));
+      }
       this.directive = builder.build();
       readRpcIdsFromXml(st);
     }
@@ -3075,6 +3104,10 @@ public abstract class FSEditLogOp {
       if (directive.getPool() != null) {
         builder.append(",").append("pool=").append(directive.getPool());
       }
+      if (directive.getExpiration() != null) {
+        builder.append(",").append("expiration=").
+            append(directive.getExpiration().getMillis());
+      }
       appendRpcIdsToString(builder, rpcClientId, rpcCallId);
       builder.append("]");
       return builder.toString();
@@ -3089,12 +3122,12 @@ public abstract class FSEditLogOp {
     long id;
 
     public RemoveCacheDirectiveInfoOp() {
-      super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+      super(OP_REMOVE_CACHE_DIRECTIVE);
     }
 
     static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
       return (RemoveCacheDirectiveInfoOp) cache
-          .get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+          .get(OP_REMOVE_CACHE_DIRECTIVE);
     }
 
     public RemoveCacheDirectiveInfoOp setId(long id) {
@@ -3162,7 +3195,7 @@ public abstract class FSEditLogOp {
 
     @Override
     public void writeFields(DataOutputStream out) throws IOException {
-      info .writeTo(out);
+      info.writeTo(out);
       writeRpcIds(rpcClientId, rpcCallId, out);
     }
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Thu Nov 28 07:20:21 2013
@@ -64,12 +64,12 @@ public enum FSEditLogOpCodes {
   OP_DISALLOW_SNAPSHOT          ((byte) 30),
   OP_SET_GENSTAMP_V2            ((byte) 31),
   OP_ALLOCATE_BLOCK_ID          ((byte) 32),
-  OP_ADD_PATH_BASED_CACHE_DIRECTIVE       ((byte) 33),
-  OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE    ((byte) 34),
+  OP_ADD_CACHE_DIRECTIVE       ((byte) 33),
+  OP_REMOVE_CACHE_DIRECTIVE    ((byte) 34),
   OP_ADD_CACHE_POOL                       ((byte) 35),
   OP_MODIFY_CACHE_POOL                    ((byte) 36),
   OP_REMOVE_CACHE_POOL                    ((byte) 37),
-  OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE    ((byte) 38);
+  OP_MODIFY_CACHE_DIRECTIVE    ((byte) 38);
 
   private byte opCode;
 

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Thu Nov 28 07:20:21 2013
@@ -29,12 +29,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.server.namenode.CachePool;
 import org.apache.hadoop.hdfs.tools.TableListing.Justification;
 import org.apache.hadoop.ipc.RemoteException;
@@ -132,7 +133,8 @@ public class CacheAdmin extends Configur
     @Override
     public String getShortUsage() {
       return "[" + getName() +
-          " -path <path> -replication <replication> -pool <pool-name>]\n";
+          " -path <path> -pool <pool-name> " +
+          "[-replication <replication>] [-ttl <time-to-live>]]\n";
     }
 
     @Override
@@ -140,11 +142,15 @@ public class CacheAdmin extends Configur
       TableListing listing = getOptionDescriptionListing();
       listing.addRow("<path>", "A path to cache. The path can be " +
           "a directory or a file.");
-      listing.addRow("<replication>", "The cache replication factor to use. " +
-          "Defaults to 1.");
       listing.addRow("<pool-name>", "The pool to which the directive will be " +
           "added. You must have write permission on the cache pool "
           + "in order to add new directives.");
+      listing.addRow("<replication>", "The cache replication factor to use. " +
+          "Defaults to 1.");
+      listing.addRow("<time-to-live>", "How long the directive is " +
+          "valid. Can be specified in minutes, hours, and days via e.g. " +
+          "30m, 4h, 2d. Valid units are [smhd]." +
+          " If unspecified, the directive never expires.");
       return getShortUsage() + "\n" +
         "Add a new cache directive.\n\n" +
         listing.toString();
@@ -152,33 +158,48 @@ public class CacheAdmin extends Configur
 
     @Override
     public int run(Configuration conf, List<String> args) throws IOException {
+      CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder();
+
       String path = StringUtils.popOptionWithArgument("-path", args);
       if (path == null) {
         System.err.println("You must specify a path with -path.");
         return 1;
       }
-      short replication = 1;
-      String replicationString =
-          StringUtils.popOptionWithArgument("-replication", args);
-      if (replicationString != null) {
-        replication = Short.parseShort(replicationString);
-      }
+      builder.setPath(new Path(path));
+
       String poolName = StringUtils.popOptionWithArgument("-pool", args);
       if (poolName == null) {
         System.err.println("You must specify a pool name with -pool.");
         return 1;
       }
+      builder.setPool(poolName);
+
+      String replicationString =
+          StringUtils.popOptionWithArgument("-replication", args);
+      if (replicationString != null) {
+        Short replication = Short.parseShort(replicationString);
+        builder.setReplication(replication);
+      }
+
+      String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
+      if (ttlString != null) {
+        try {
+          long ttl = DFSUtil.parseRelativeTime(ttlString);
+          builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
+        } catch (IOException e) {
+          System.err.println(
+              "Error while parsing ttl value: " + e.getMessage());
+          return 1;
+        }
+      }
+
       if (!args.isEmpty()) {
         System.err.println("Can't understand argument: " + args.get(0));
         return 1;
       }
         
       DistributedFileSystem dfs = getDFS(conf);
-      CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder().
-          setPath(new Path(path)).
-          setReplication(replication).
-          setPool(poolName).
-          build();
+      CacheDirectiveInfo directive = builder.build();
       try {
         long id = dfs.addCacheDirective(directive);
         System.out.println("Added cache directive " + id);
@@ -261,7 +282,7 @@ public class CacheAdmin extends Configur
     public String getShortUsage() {
       return "[" + getName() +
           " -id <id> [-path <path>] [-replication <replication>] " +
-          "[-pool <pool-name>] ]\n";
+          "[-pool <pool-name>] [-ttl <time-to-live>]]\n";
     }
 
     @Override
@@ -275,6 +296,10 @@ public class CacheAdmin extends Configur
       listing.addRow("<pool-name>", "The pool to which the directive will be " +
           "added. You must have write permission on the cache pool "
           + "in order to move a directive into it. (optional)");
+      listing.addRow("<time-to-live>", "How long the directive is " +
+          "valid. Can be specified in minutes, hours, and days via e.g. " +
+          "30m, 4h, 2d. Valid units are [smhd]." +
+          " If unspecified, the directive never expires.");
       return getShortUsage() + "\n" +
         "Modify a cache directive.\n\n" +
         listing.toString();
@@ -308,6 +333,19 @@ public class CacheAdmin extends Configur
         builder.setPool(poolName);
         modified = true;
       }
+      String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
+      if (ttlString != null) {
+        long ttl;
+        try {
+          ttl = DFSUtil.parseRelativeTime(ttlString);
+        } catch (IOException e) {
+          System.err.println(
+              "Error while parsing ttl value: " + e.getMessage());
+          return 1;
+        }
+        builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
+        modified = true;
+      }
       if (!args.isEmpty()) {
         System.err.println("Can't understand argument: " + args.get(0));
         System.err.println("Usage is " + getShortUsage());
@@ -435,7 +473,8 @@ public class CacheAdmin extends Configur
       TableListing.Builder tableBuilder = new TableListing.Builder().
           addField("ID", Justification.RIGHT).
           addField("POOL", Justification.LEFT).
-          addField("REPLICATION", Justification.RIGHT).
+          addField("REPL", Justification.RIGHT).
+          addField("EXPIRY", Justification.LEFT).
           addField("PATH", Justification.LEFT);
       if (printStats) {
         tableBuilder.addField("NEEDED", Justification.RIGHT).
@@ -456,6 +495,14 @@ public class CacheAdmin extends Configur
         row.add("" + directive.getId());
         row.add(directive.getPool());
         row.add("" + directive.getReplication());
+        String expiry;
+        if (directive.getExpiration().getMillis() ==
+            CacheDirectiveInfo.Expiration.EXPIRY_NEVER) {
+          expiry = "never";
+        } else {
+          expiry = directive.getExpiration().toString();
+        }
+        row.add(expiry);
         row.add(directive.getPath().toUri().getPath());
         if (printStats) {
           row.add("" + stats.getBytesNeeded());

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Thu Nov 28 07:20:21 2013
@@ -368,12 +368,19 @@ message CacheDirectiveInfoProto {
   optional string path = 2;
   optional uint32 replication = 3;
   optional string pool = 4;
+  optional CacheDirectiveInfoExpirationProto expiration = 5;
+}
+
+message CacheDirectiveInfoExpirationProto {
+  required int64 millis = 1;
+  required bool isRelative = 2;
 }
 
 message CacheDirectiveStatsProto {
   required int64 bytesNeeded = 1;
   required int64 bytesCached = 2;
   required int64 filesAffected = 3;
+  required bool hasExpired = 4;
 }
 
 message AddCacheDirectiveRequestProto {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java Thu Nov 28 07:20:21 2013
@@ -31,6 +31,7 @@ import static org.apache.hadoop.hdfs.DFS
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
 import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
 import static org.hamcrest.CoreMatchers.not;
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
 import org.apache.hadoop.util.Shell;
 import org.junit.Assume;
 import org.junit.Before;
@@ -724,4 +726,43 @@ public class TestDFSUtil {
         DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
         DFSUtil.getSpnegoKeytabKey(conf, defaultKey));
   }
+
+  @Test(timeout=1000)
+  public void testDurationToString() throws Exception {
+    assertEquals("000:00:00:00", DFSUtil.durationToString(0));
+    try {
+      DFSUtil.durationToString(-199);
+    } catch (IllegalArgumentException e) {
+      GenericTestUtils.assertExceptionContains("Invalid negative duration", e);
+    }
+    assertEquals("001:01:01:01",
+        DFSUtil.durationToString(((24*60*60)+(60*60)+(60)+1)*1000));
+    assertEquals("000:23:59:59",
+        DFSUtil.durationToString(((23*60*60)+(59*60)+(59))*1000));
+  }
+
+  @Test(timeout=5000)
+  public void testRelativeTimeConversion() throws Exception {
+    try {
+      DFSUtil.parseRelativeTime("1");
+    } catch (IOException e) {
+      assertExceptionContains("too short", e);
+    }
+    try {
+      DFSUtil.parseRelativeTime("1z");
+    } catch (IOException e) {
+      assertExceptionContains("unknown time unit", e);
+    }
+    try {
+      DFSUtil.parseRelativeTime("yyz");
+    } catch (IOException e) {
+      assertExceptionContains("is not a number", e);
+    }
+    assertEquals(61*1000, DFSUtil.parseRelativeTime("61s"));
+    assertEquals(61*60*1000, DFSUtil.parseRelativeTime("61m"));
+    assertEquals(0, DFSUtil.parseRelativeTime("0s"));
+    assertEquals(25*60*60*1000, DFSUtil.parseRelativeTime("25h"));
+    assertEquals(4*24*60*60*1000, DFSUtil.parseRelativeTime("4d"));
+    assertEquals(999*24*60*60*1000, DFSUtil.parseRelativeTime("999d"));
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Thu Nov 28 07:20:21 2013
@@ -33,10 +33,12 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
+import org.apache.commons.lang.time.DateUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -54,13 +56,13 @@ import org.apache.hadoop.hdfs.Distribute
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
 import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
 import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
 import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
 import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
 import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.io.nativeio.NativeIO;
 import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@@ -521,10 +523,14 @@ public class TestCacheDirectives {
     int numEntries = 10;
     String entryPrefix = "/party-";
     long prevId = -1;
+    final Date expiry = new Date();
     for (int i=0; i<numEntries; i++) {
       prevId = dfs.addCacheDirective(
           new CacheDirectiveInfo.Builder().
-            setPath(new Path(entryPrefix + i)).setPool(pool).build());
+            setPath(new Path(entryPrefix + i)).setPool(pool).
+            setExpiration(
+                CacheDirectiveInfo.Expiration.newAbsolute(expiry.getTime())).
+            build());
     }
     RemoteIterator<CacheDirectiveEntry> dit
         = dfs.listCacheDirectives(null);
@@ -558,6 +564,7 @@ public class TestCacheDirectives {
       assertEquals(i+1, cd.getId().longValue());
       assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
       assertEquals(pool, cd.getPool());
+      assertEquals(expiry.getTime(), cd.getExpiration().getMillis());
     }
     assertFalse("Unexpected # of cache directives found", dit.hasNext());
 
@@ -1001,4 +1008,58 @@ public class TestCacheDirectives {
         info.getMode().toShort());
     assertEquals("Mismatched weight", 99, (int)info.getWeight());
   }
+
+  @Test(timeout=60000)
+  public void testExpiry() throws Exception {
+    HdfsConfiguration conf = createCachingConf();
+    MiniDFSCluster cluster =
+      new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+    try {
+      DistributedFileSystem dfs = cluster.getFileSystem();
+      String pool = "pool1";
+      dfs.addCachePool(new CachePoolInfo(pool));
+      Path p = new Path("/mypath");
+      DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
+      // Expire after test timeout
+      Date start = new Date();
+      Date expiry = DateUtils.addSeconds(start, 120);
+      final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+          .setPath(p)
+          .setPool(pool)
+          .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
+          .setReplication((short)2)
+          .build());
+      waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
+      // Change it to expire sooner
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+          .setExpiration(Expiration.newRelative(0)).build());
+      waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
+      RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
+      CacheDirectiveEntry ent = it.next();
+      assertFalse(it.hasNext());
+      Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+      assertTrue("Directive should have expired",
+          entryExpiry.before(new Date()));
+      // Change it back to expire later
+      dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+          .setExpiration(Expiration.newRelative(120000)).build());
+      waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
+      it = dfs.listCacheDirectives(null);
+      ent = it.next();
+      assertFalse(it.hasNext());
+      entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+      assertTrue("Directive should not have expired",
+          entryExpiry.after(new Date()));
+      // Verify that setting a negative TTL throws an error
+      try {
+        dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+            .setExpiration(Expiration.newRelative(-1)).build());
+      } catch (InvalidRequestException e) {
+        GenericTestUtils
+            .assertExceptionContains("Cannot set a negative expiration", e);
+      }
+    } finally {
+      cluster.shutdown();
+    }
+  }
 }

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
Binary files - no diff available.