You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by wa...@apache.org on 2013/11/28 08:20:22 UTC
svn commit: r1546301 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/hdfs/protocolPB/
src/main/java/org/apache/hado...
Author: wang
Date: Thu Nov 28 07:20:21 2013
New Revision: 1546301
URL: http://svn.apache.org/r1546301
Log:
HDFS-5430. Support TTL on CacheDirectives. Contributed by Andrew Wang.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored.xml
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testCacheAdminConf.xml
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Thu Nov 28 07:20:21 2013
@@ -226,6 +226,8 @@ Trunk (Unreleased)
HDFS-5537. Remove FileWithSnapshot interface. (jing9 via szetszwo)
+ HDFS-5430. Support TTL on CacheDirectives. (wang)
+
OPTIMIZATIONS
HDFS-5349. DNA_CACHE and DNA_UNCACHE should be by blockId only. (cmccabe)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Thu Nov 28 07:20:21 2013
@@ -38,12 +38,15 @@ import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.security.SecureRandom;
+import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.Date;
import java.util.HashSet;
import java.util.List;
+import java.util.Locale;
import java.util.Map;
import java.util.Random;
import java.util.Set;
@@ -1426,4 +1429,64 @@ public class DFSUtil {
sslConf.get("ssl.server.truststore.password"),
sslConf.get("ssl.server.truststore.type", "jks"));
}
-}
\ No newline at end of file
+
+ /**
+ * Converts a Date into an ISO-8601 formatted datetime string.
+ */
+ public static String dateToIso8601String(Date date) {
+ SimpleDateFormat df =
+ new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ", Locale.ENGLISH);
+ return df.format(date);
+ }
+
+ /**
+ * Converts a time duration in milliseconds into DDD:HH:MM:SS format.
+ */
+ public static String durationToString(long durationMs) {
+ Preconditions.checkArgument(durationMs >= 0, "Invalid negative duration");
+ // Chop off the milliseconds
+ long durationSec = durationMs / 1000;
+ final int secondsPerMinute = 60;
+ final int secondsPerHour = 60*60;
+ final int secondsPerDay = 60*60*24;
+ final long days = durationSec / secondsPerDay;
+ durationSec -= days * secondsPerDay;
+ final long hours = durationSec / secondsPerHour;
+ durationSec -= hours * secondsPerHour;
+ final long minutes = durationSec / secondsPerMinute;
+ durationSec -= minutes * secondsPerMinute;
+ final long seconds = durationSec;
+ return String.format("%03d:%02d:%02d:%02d", days, hours, minutes, seconds);
+ }
+
+ /**
+ * Converts a relative time string into a duration in milliseconds.
+ */
+ public static long parseRelativeTime(String relTime) throws IOException {
+ if (relTime.length() < 2) {
+ throw new IOException("Unable to parse relative time value of " + relTime
+ + ": too short");
+ }
+ String ttlString = relTime.substring(0, relTime.length()-1);
+ int ttl;
+ try {
+ ttl = Integer.parseInt(ttlString);
+ } catch (NumberFormatException e) {
+ throw new IOException("Unable to parse relative time value of " + relTime
+ + ": " + ttlString + " is not a number");
+ }
+ if (relTime.endsWith("s")) {
+ // pass
+ } else if (relTime.endsWith("m")) {
+ ttl *= 60;
+ } else if (relTime.endsWith("h")) {
+ ttl *= 60*60;
+ } else if (relTime.endsWith("d")) {
+ ttl *= 60*60*24;
+ } else {
+ throw new IOException("Unable to parse relative time value of " + relTime
+ + ": unknown time unit " + relTime.charAt(relTime.length() - 1));
+ }
+ return ttl*1000;
+ }
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirective.java Thu Nov 28 07:20:21 2013
@@ -17,9 +17,14 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Date;
+
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.IntrusiveCollection.Element;
@@ -27,7 +32,7 @@ import org.apache.hadoop.util.IntrusiveC
import com.google.common.base.Preconditions;
/**
- * Represents an entry in the PathBasedCache on the NameNode.
+ * Namenode class that tracks state related to a cached path.
*
* This is an implementation class, not part of the public API.
*/
@@ -37,6 +42,8 @@ public final class CacheDirective implem
private final String path;
private final short replication;
private CachePool pool;
+ private final long expiryTime;
+
private long bytesNeeded;
private long bytesCached;
private long filesAffected;
@@ -44,13 +51,13 @@ public final class CacheDirective implem
private Element next;
public CacheDirective(long id, String path,
- short replication) {
+ short replication, long expiryTime) {
Preconditions.checkArgument(id > 0);
this.id = id;
+ this.path = checkNotNull(path);
Preconditions.checkArgument(replication > 0);
- this.path = path;
this.replication = replication;
- Preconditions.checkNotNull(path);
+ this.expiryTime = expiryTime;
this.bytesNeeded = 0;
this.bytesCached = 0;
this.filesAffected = 0;
@@ -64,20 +71,40 @@ public final class CacheDirective implem
return path;
}
- public CachePool getPool() {
- return pool;
- }
-
public short getReplication() {
return replication;
}
+ public CachePool getPool() {
+ return pool;
+ }
+
+ /**
+ * @return When this directive expires, in milliseconds since Unix epoch
+ */
+ public long getExpiryTime() {
+ return expiryTime;
+ }
+
+ /**
+ * @return When this directive expires, as an ISO-8601 formatted string.
+ */
+ public String getExpiryTimeString() {
+ return DFSUtil.dateToIso8601String(new Date(expiryTime));
+ }
+
+ /**
+ * Returns a {@link CacheDirectiveInfo} based on this CacheDirective.
+ * <p>
+ * This always sets an absolute expiry time, never a relative TTL.
+ */
public CacheDirectiveInfo toInfo() {
return new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path(path)).
setReplication(replication).
setPool(pool.getPoolName()).
+ setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
build();
}
@@ -86,6 +113,7 @@ public final class CacheDirective implem
setBytesNeeded(bytesNeeded).
setBytesCached(bytesCached).
setFilesAffected(filesAffected).
+ setHasExpired(new Date().getTime() > expiryTime).
build();
}
@@ -100,6 +128,7 @@ public final class CacheDirective implem
append(", path:").append(path).
append(", replication:").append(replication).
append(", pool:").append(pool).
+ append(", expiryTime: ").append(getExpiryTimeString()).
append(", bytesNeeded:").append(bytesNeeded).
append(", bytesCached:").append(bytesCached).
append(", filesAffected:").append(filesAffected).
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveInfo.java Thu Nov 28 07:20:21 2013
@@ -17,11 +17,14 @@
*/
package org.apache.hadoop.hdfs.protocol;
+import java.util.Date;
+
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSUtil;
/**
* Describes a path-based cache directive.
@@ -37,6 +40,7 @@ public class CacheDirectiveInfo {
private Path path;
private Short replication;
private String pool;
+ private Expiration expiration;
/**
* Builds a new CacheDirectiveInfo populated with the set properties.
@@ -44,7 +48,7 @@ public class CacheDirectiveInfo {
* @return New CacheDirectiveInfo.
*/
public CacheDirectiveInfo build() {
- return new CacheDirectiveInfo(id, path, replication, pool);
+ return new CacheDirectiveInfo(id, path, replication, pool, expiration);
}
/**
@@ -62,6 +66,7 @@ public class CacheDirectiveInfo {
this.path = directive.getPath();
this.replication = directive.getReplication();
this.pool = directive.getPool();
+ this.expiration = directive.getExpiration();
}
/**
@@ -107,18 +112,134 @@ public class CacheDirectiveInfo {
this.pool = pool;
return this;
}
+
+ /**
+ * Sets when the CacheDirective should expire. A
+ * {@link CacheDirectiveInfo.Expiration} can specify either an absolute or
+ * relative expiration time.
+ *
+ * @param expiration when this CacheDirective should expire
+ * @return This builder, for call chaining
+ */
+ public Builder setExpiration(Expiration expiration) {
+ this.expiration = expiration;
+ return this;
+ }
+ }
+
+ /**
+ * Denotes a relative or absolute expiration time for a CacheDirective. Use
+ * factory methods {@link CacheDirectiveInfo.Expiration#newAbsolute(Date)} and
+ * {@link CacheDirectiveInfo.Expiration#newRelative(long)} to create an
+ * Expiration.
+ * <p>
+ * In either case, the server-side clock is used to determine when a
+ * CacheDirective expires.
+ */
+ public static class Expiration {
+
+ /** Denotes a CacheDirectiveInfo that never expires **/
+ public static final int EXPIRY_NEVER = -1;
+
+ /**
+ * Create a new relative Expiration.
+ *
+ * @param ms how long until the CacheDirective expires, in milliseconds
+ * @return A relative Expiration
+ */
+ public static Expiration newRelative(long ms) {
+ return new Expiration(ms, true);
+ }
+
+ /**
+ * Create a new absolute Expiration.
+ *
+ * @param date when the CacheDirective expires
+ * @return An absolute Expiration
+ */
+ public static Expiration newAbsolute(Date date) {
+ return new Expiration(date.getTime(), false);
+ }
+
+ /**
+ * Create a new absolute Expiration.
+ *
+ * @param ms when the CacheDirective expires, in milliseconds since the Unix
+ * epoch.
+ * @return An absolute Expiration
+ */
+ public static Expiration newAbsolute(long ms) {
+ return new Expiration(ms, false);
+ }
+
+ private final long ms;
+ private final boolean isRelative;
+
+ private Expiration(long ms, boolean isRelative) {
+ this.ms = ms;
+ this.isRelative = isRelative;
+ }
+
+ /**
+ * @return true if Expiration was specified as a relative duration, false if
+ * specified as an absolute time.
+ */
+ public boolean isRelative() {
+ return isRelative;
+ }
+
+ /**
+ * @return The raw underlying millisecond value, either a relative duration
+ * or an absolute time as milliseconds since the Unix epoch.
+ */
+ public long getMillis() {
+ return ms;
+ }
+
+ /**
+ * @return Expiration time as a {@link Date} object. This converts a
+ * relative Expiration into an absolute Date based on the local
+ * clock.
+ */
+ public Date getAbsoluteDate() {
+ return new Date(getAbsoluteMillis());
+ }
+
+ /**
+ * @return Expiration time in milliseconds from the Unix epoch. This
+ * converts a relative Expiration into an absolute time based on the
+ * local clock.
+ */
+ public long getAbsoluteMillis() {
+ if (!isRelative) {
+ return ms;
+ } else {
+ return new Date().getTime() + ms;
+ }
+ }
+
+ @Override
+ public String toString() {
+ if (isRelative) {
+ return DFSUtil.durationToString(ms);
+ }
+ return DFSUtil.dateToIso8601String(new Date(ms));
+ }
}
private final Long id;
private final Path path;
private final Short replication;
private final String pool;
+ private final Expiration expiration;
- CacheDirectiveInfo(Long id, Path path, Short replication, String pool) {
+ CacheDirectiveInfo(Long id, Path path, Short replication, String pool,
+ Expiration expiration) {
this.id = id;
this.path = path;
this.replication = replication;
this.pool = pool;
+ this.expiration = expiration;
}
/**
@@ -148,7 +269,14 @@ public class CacheDirectiveInfo {
public String getPool() {
return pool;
}
-
+
+ /**
+ * @return When this directive expires.
+ */
+ public Expiration getExpiration() {
+ return expiration;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null) {
@@ -162,6 +290,7 @@ public class CacheDirectiveInfo {
append(getPath(), other.getPath()).
append(getReplication(), other.getReplication()).
append(getPool(), other.getPool()).
+ append(getExpiration(), other.getExpiration()).
isEquals();
}
@@ -171,6 +300,7 @@ public class CacheDirectiveInfo {
append(path).
append(replication).
append(pool).
+ append(expiration).
hashCode();
}
@@ -181,19 +311,23 @@ public class CacheDirectiveInfo {
String prefix = "";
if (id != null) {
builder.append(prefix).append("id: ").append(id);
- prefix = ",";
+ prefix = ", ";
}
if (path != null) {
builder.append(prefix).append("path: ").append(path);
- prefix = ",";
+ prefix = ", ";
}
if (replication != null) {
builder.append(prefix).append("replication: ").append(replication);
- prefix = ",";
+ prefix = ", ";
}
if (pool != null) {
builder.append(prefix).append("pool: ").append(pool);
- prefix = ",";
+ prefix = ", ";
+ }
+ if (expiration != null) {
+ builder.append(prefix).append("expiration: ").append(expiration);
+ prefix = ", ";
}
builder.append("}");
return builder.toString();
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/CacheDirectiveStats.java Thu Nov 28 07:20:21 2013
@@ -30,6 +30,7 @@ public class CacheDirectiveStats {
private long bytesNeeded;
private long bytesCached;
private long filesAffected;
+ private boolean hasExpired;
/**
* Builds a new CacheDirectiveStats populated with the set properties.
@@ -37,7 +38,8 @@ public class CacheDirectiveStats {
* @return New CacheDirectiveStats.
*/
public CacheDirectiveStats build() {
- return new CacheDirectiveStats(bytesNeeded, bytesCached, filesAffected);
+ return new CacheDirectiveStats(bytesNeeded, bytesCached, filesAffected,
+ hasExpired);
}
/**
@@ -52,7 +54,7 @@ public class CacheDirectiveStats {
* @param bytesNeeded The bytes needed.
* @return This builder, for call chaining.
*/
- public Builder setBytesNeeded(Long bytesNeeded) {
+ public Builder setBytesNeeded(long bytesNeeded) {
this.bytesNeeded = bytesNeeded;
return this;
}
@@ -63,7 +65,7 @@ public class CacheDirectiveStats {
* @param bytesCached The bytes cached.
* @return This builder, for call chaining.
*/
- public Builder setBytesCached(Long bytesCached) {
+ public Builder setBytesCached(long bytesCached) {
this.bytesCached = bytesCached;
return this;
}
@@ -74,21 +76,34 @@ public class CacheDirectiveStats {
* @param filesAffected The files affected.
* @return This builder, for call chaining.
*/
- public Builder setFilesAffected(Long filesAffected) {
+ public Builder setFilesAffected(long filesAffected) {
this.filesAffected = filesAffected;
return this;
}
+
+ /**
+ * Sets whether this directive has expired.
+ *
+ * @param hasExpired if this directive has expired
+ * @return This builder, for call chaining.
+ */
+ public Builder setHasExpired(boolean hasExpired) {
+ this.hasExpired = hasExpired;
+ return this;
+ }
}
private final long bytesNeeded;
private final long bytesCached;
private final long filesAffected;
+ private final boolean hasExpired;
private CacheDirectiveStats(long bytesNeeded, long bytesCached,
- long filesAffected) {
+ long filesAffected, boolean hasExpired) {
this.bytesNeeded = bytesNeeded;
this.bytesCached = bytesCached;
this.filesAffected = filesAffected;
+ this.hasExpired = hasExpired;
}
/**
@@ -112,6 +127,13 @@ public class CacheDirectiveStats {
return filesAffected;
}
+ /**
+ * @return Whether this directive has expired.
+ */
+ public boolean hasExpired() {
+ return hasExpired;
+ }
+
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
@@ -119,6 +141,7 @@ public class CacheDirectiveStats {
builder.append("bytesNeeded: ").append(bytesNeeded);
builder.append(", ").append("bytesCached: ").append(bytesCached);
builder.append(", ").append("filesAffected: ").append(filesAffected);
+ builder.append(", ").append("hasExpired: ").append(hasExpired);
builder.append("}");
return builder.toString();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java Thu Nov 28 07:20:21 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hdfs.protocol.L
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveEntryProto;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveInfoExpirationProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CacheDirectiveStatsProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolEntryProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CachePoolInfoProto;
@@ -1591,6 +1592,9 @@ public class PBHelper {
if (info.getPool() != null) {
builder.setPool(info.getPool());
}
+ if (info.getExpiration() != null) {
+ builder.setExpiration(convert(info.getExpiration()));
+ }
return builder.build();
}
@@ -1611,15 +1615,35 @@ public class PBHelper {
if (proto.hasPool()) {
builder.setPool(proto.getPool());
}
+ if (proto.hasExpiration()) {
+ builder.setExpiration(convert(proto.getExpiration()));
+ }
return builder.build();
}
-
+
+ public static CacheDirectiveInfoExpirationProto convert(
+ CacheDirectiveInfo.Expiration expiration) {
+ return CacheDirectiveInfoExpirationProto.newBuilder()
+ .setIsRelative(expiration.isRelative())
+ .setMillis(expiration.getMillis())
+ .build();
+ }
+
+ public static CacheDirectiveInfo.Expiration convert(
+ CacheDirectiveInfoExpirationProto proto) {
+ if (proto.getIsRelative()) {
+ return CacheDirectiveInfo.Expiration.newRelative(proto.getMillis());
+ }
+ return CacheDirectiveInfo.Expiration.newAbsolute(proto.getMillis());
+ }
+
public static CacheDirectiveStatsProto convert(CacheDirectiveStats stats) {
CacheDirectiveStatsProto.Builder builder =
CacheDirectiveStatsProto.newBuilder();
builder.setBytesNeeded(stats.getBytesNeeded());
builder.setBytesCached(stats.getBytesCached());
builder.setFilesAffected(stats.getFilesAffected());
+ builder.setHasExpired(stats.hasExpired());
return builder.build();
}
@@ -1628,6 +1652,7 @@ public class PBHelper {
builder.setBytesNeeded(proto.getBytesNeeded());
builder.setBytesCached(proto.getBytesCached());
builder.setFilesAffected(proto.getFilesAffected());
+ builder.setHasExpired(proto.getHasExpired());
return builder.build();
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/CacheReplicationMonitor.java Thu Nov 28 07:20:21 2013
@@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Exi
import java.io.Closeable;
import java.io.IOException;
import java.util.Collection;
+import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
@@ -211,12 +212,24 @@ public class CacheReplicationMonitor ext
*/
private void rescanCacheDirectives() {
FSDirectory fsDir = namesystem.getFSDirectory();
- for (CacheDirective pce : cacheManager.getEntriesById().values()) {
+ final long now = new Date().getTime();
+ for (CacheDirective directive : cacheManager.getEntriesById().values()) {
+ // Reset the directive
+ directive.clearBytesNeeded();
+ directive.clearBytesCached();
+ directive.clearFilesAffected();
+ // Skip processing this entry if it has expired
+ LOG.info("Directive expiry is at " + directive.getExpiryTime());
+ if (directive.getExpiryTime() > 0 && directive.getExpiryTime() <= now) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skipping directive id " + directive.getId()
+ + " because it has expired (" + directive.getExpiryTime() + ">="
+ + now);
+ }
+ continue;
+ }
scannedDirectives++;
- pce.clearBytesNeeded();
- pce.clearBytesCached();
- pce.clearFilesAffected();
- String path = pce.getPath();
+ String path = directive.getPath();
INode node;
try {
node = fsDir.getINode(path);
@@ -233,11 +246,11 @@ public class CacheReplicationMonitor ext
ReadOnlyList<INode> children = dir.getChildrenList(null);
for (INode child : children) {
if (child.isFile()) {
- rescanFile(pce, child.asFile());
+ rescanFile(directive, child.asFile());
}
}
} else if (node.isFile()) {
- rescanFile(pce, node.asFile());
+ rescanFile(directive, node.asFile());
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Ignoring non-directory, non-file inode " + node +
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/CacheManager.java Thu Nov 28 07:20:21 2013
@@ -17,12 +17,12 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
-import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES_DEFAULT;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS_DEFAULT;
@@ -43,18 +43,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
+import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirective;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
@@ -249,7 +249,7 @@ public final class CacheManager {
return cachedBlocks;
}
- private long getNextEntryId() throws IOException {
+ private long getNextDirectiveId() throws IOException {
assert namesystem.hasWriteLock();
if (nextDirectiveId >= Long.MAX_VALUE - 1) {
throw new IOException("No more available IDs.");
@@ -302,6 +302,34 @@ public final class CacheManager {
}
/**
+ * Calculates the absolute expiry time of the directive from the
+ * {@link CacheDirectiveInfo.Expiration}. This converts a relative Expiration
+ * into an absolute time based on the local clock.
+ *
+ * @param directive from which to get the expiry time
+ * @param defaultValue to use if Expiration is not set
+ * @return Absolute expiry time in milliseconds since Unix epoch
+ * @throws InvalidRequestException if the Expiration is invalid
+ */
+ private static long validateExpiryTime(CacheDirectiveInfo directive,
+ long defaultValue) throws InvalidRequestException {
+ long expiryTime;
+ CacheDirectiveInfo.Expiration expiration = directive.getExpiration();
+ if (expiration != null) {
+ if (expiration.getMillis() < 0) {
+ throw new InvalidRequestException("Cannot set a negative expiration: "
+ + expiration.getMillis());
+ }
+ // Converts a relative duration into an absolute time based on the local
+ // clock
+ expiryTime = expiration.getAbsoluteMillis();
+ } else {
+ expiryTime = defaultValue;
+ }
+ return expiryTime;
+ }
+
+ /**
* Get a CacheDirective by ID, validating the ID and that the directive
* exists.
*/
@@ -346,6 +374,26 @@ public final class CacheManager {
directives.add(directive);
}
+ /**
+ * To be called only from the edit log loading code
+ */
+ CacheDirectiveInfo addDirectiveFromEditLog(CacheDirectiveInfo directive)
+ throws InvalidRequestException {
+ long id = directive.getId();
+ CacheDirective entry =
+ new CacheDirective(
+ directive.getId(),
+ directive.getPath().toUri().getPath(),
+ directive.getReplication(),
+ directive.getExpiration().getAbsoluteMillis());
+ CachePool pool = cachePools.get(directive.getPool());
+ addInternal(entry, pool);
+ if (nextDirectiveId <= id) {
+ nextDirectiveId = id + 1;
+ }
+ return entry.toInfo();
+ }
+
public CacheDirectiveInfo addDirective(
CacheDirectiveInfo info, FSPermissionChecker pc)
throws IOException {
@@ -356,27 +404,12 @@ public final class CacheManager {
checkWritePermission(pc, pool);
String path = validatePath(info);
short replication = validateReplication(info, (short)1);
- long id;
- if (info.getId() != null) {
- // We are loading a directive from the edit log.
- // Use the ID from the edit log.
- id = info.getId();
- if (id <= 0) {
- throw new InvalidRequestException("can't add an ID " +
- "of " + id + ": it is not positive.");
- }
- if (id >= Long.MAX_VALUE) {
- throw new InvalidRequestException("can't add an ID " +
- "of " + id + ": it is too big.");
- }
- if (nextDirectiveId <= id) {
- nextDirectiveId = id + 1;
- }
- } else {
- // Add a new directive with the next available ID.
- id = getNextEntryId();
- }
- directive = new CacheDirective(id, path, replication);
+ long expiryTime = validateExpiryTime(info,
+ CacheDirectiveInfo.Expiration.EXPIRY_NEVER);
+ // All validation passed
+ // Add a new entry with the next available ID.
+ long id = getNextDirectiveId();
+ directive = new CacheDirective(id, path, replication, expiryTime);
addInternal(directive, pool);
} catch (IOException e) {
LOG.warn("addDirective of " + info + " failed: ", e);
@@ -407,10 +440,13 @@ public final class CacheManager {
if (info.getPath() != null) {
path = validatePath(info);
}
+
short replication = prevEntry.getReplication();
- if (info.getReplication() != null) {
- replication = validateReplication(info, replication);
- }
+ replication = validateReplication(info, replication);
+
+ long expiryTime = prevEntry.getExpiryTime();
+ expiryTime = validateExpiryTime(info, expiryTime);
+
CachePool pool = prevEntry.getPool();
if (info.getPool() != null) {
pool = getCachePool(validatePoolName(info));
@@ -418,7 +454,7 @@ public final class CacheManager {
}
removeInternal(prevEntry);
CacheDirective newEntry =
- new CacheDirective(id, path, replication);
+ new CacheDirective(id, path, replication, expiryTime);
addInternal(newEntry, pool);
} catch (IOException e) {
LOG.warn("modifyDirective of " + idString + " failed: ", e);
@@ -788,6 +824,7 @@ public final class CacheManager {
Text.writeString(out, directive.getPath());
out.writeShort(directive.getReplication());
Text.writeString(out, directive.getPool().getPoolName());
+ out.writeLong(directive.getExpiryTime());
counter.increment();
}
prog.endStep(Phase.SAVING_CHECKPOINT, step);
@@ -826,6 +863,7 @@ public final class CacheManager {
String path = Text.readString(in);
short replication = in.readShort();
String poolName = Text.readString(in);
+ long expiryTime = in.readLong();
// Get pool reference by looking it up in the map
CachePool pool = cachePools.get(poolName);
if (pool == null) {
@@ -833,7 +871,7 @@ public final class CacheManager {
", which does not exist.");
}
CacheDirective directive =
- new CacheDirective(directiveId, path, replication);
+ new CacheDirective(directiveId, path, replication, expiryTime);
boolean addedDirective = pool.getDirectiveList().add(directive);
assert addedDirective;
if (directivesById.put(directive.getId(), directive) != null) {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Nov 28 07:20:21 2013
@@ -953,7 +953,11 @@ public class FSEditLog implements LogsPu
.setSnapshotRoot(path);
logEdit(op);
}
-
+
+ /**
+ * Log a CacheDirectiveInfo returned from
+ * {@link CacheManager#addDirective(CacheDirectiveInfo, FSPermissionChecker)}
+ */
void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
boolean toLogRpcIds) {
AddCacheDirectiveInfoOp op =
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Nov 28 07:20:21 2013
@@ -636,17 +636,17 @@ public class FSEditLogLoader {
fsNamesys.setLastAllocatedBlockId(allocateBlockIdOp.blockId);
break;
}
- case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
+ case OP_ADD_CACHE_DIRECTIVE: {
AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
CacheDirectiveInfo result = fsNamesys.
- getCacheManager().addDirective(addOp.directive, null);
+ getCacheManager().addDirectiveFromEditLog(addOp.directive);
if (toAddRetryCache) {
Long id = result.getId();
fsNamesys.addCacheEntryWithPayload(op.rpcClientId, op.rpcCallId, id);
}
break;
}
- case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
+ case OP_MODIFY_CACHE_DIRECTIVE: {
ModifyCacheDirectiveInfoOp modifyOp =
(ModifyCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().modifyDirective(
@@ -656,7 +656,7 @@ public class FSEditLogLoader {
}
break;
}
- case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
+ case OP_REMOVE_CACHE_DIRECTIVE: {
RemoveCacheDirectiveInfoOp removeOp =
(RemoveCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu Nov 28 07:20:21 2013
@@ -18,9 +18,8 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@@ -35,10 +34,11 @@ import static org.apache.hadoop.hdfs.ser
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
-import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -64,6 +64,7 @@ import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Date;
import java.util.EnumMap;
import java.util.List;
import java.util.zip.CheckedInputStream;
@@ -81,12 +82,12 @@ import org.apache.hadoop.fs.permission.P
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@@ -109,7 +110,6 @@ import org.xml.sax.helpers.AttributesImp
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
-import com.google.common.base.Strings;
/**
* Helper classes for reading the ops from an InputStream.
@@ -165,11 +165,11 @@ public abstract class FSEditLogOp {
inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
- inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
+ inst.put(OP_ADD_CACHE_DIRECTIVE,
new AddCacheDirectiveInfoOp());
- inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
+ inst.put(OP_MODIFY_CACHE_DIRECTIVE,
new ModifyCacheDirectiveInfoOp());
- inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
+ inst.put(OP_REMOVE_CACHE_DIRECTIVE,
new RemoveCacheDirectiveInfoOp());
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
@@ -2874,12 +2874,12 @@ public abstract class FSEditLogOp {
CacheDirectiveInfo directive;
public AddCacheDirectiveInfoOp() {
- super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+ super(OP_ADD_CACHE_DIRECTIVE);
}
static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (AddCacheDirectiveInfoOp) cache
- .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+ .get(OP_ADD_CACHE_DIRECTIVE);
}
public AddCacheDirectiveInfoOp setDirective(
@@ -2889,6 +2889,7 @@ public abstract class FSEditLogOp {
assert(directive.getPath() != null);
assert(directive.getReplication() != null);
assert(directive.getPool() != null);
+ assert(directive.getExpiration() != null);
return this;
}
@@ -2898,11 +2899,13 @@ public abstract class FSEditLogOp {
String path = FSImageSerialization.readString(in);
short replication = FSImageSerialization.readShort(in);
String pool = FSImageSerialization.readString(in);
+ long expiryTime = FSImageSerialization.readLong(in);
directive = new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path(path)).
setReplication(replication).
setPool(pool).
+ setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiryTime)).
build();
readRpcIds(in, logVersion);
}
@@ -2913,6 +2916,8 @@ public abstract class FSEditLogOp {
FSImageSerialization.writeString(directive.getPath().toUri().getPath(), out);
FSImageSerialization.writeShort(directive.getReplication(), out);
FSImageSerialization.writeString(directive.getPool(), out);
+ FSImageSerialization.writeLong(
+ directive.getExpiration().getMillis(), out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
@@ -2925,6 +2930,8 @@ public abstract class FSEditLogOp {
XMLUtils.addSaxString(contentHandler, "REPLICATION",
Short.toString(directive.getReplication()));
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
+ XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+ "" + directive.getExpiration().getMillis());
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@@ -2935,6 +2942,8 @@ public abstract class FSEditLogOp {
setPath(new Path(st.getValue("PATH"))).
setReplication(Short.parseShort(st.getValue("REPLICATION"))).
setPool(st.getValue("POOL")).
+ setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+ Long.parseLong(st.getValue("EXPIRATION")))).
build();
readRpcIdsFromXml(st);
}
@@ -2946,7 +2955,8 @@ public abstract class FSEditLogOp {
builder.append("id=" + directive.getId() + ",");
builder.append("path=" + directive.getPath().toUri().getPath() + ",");
builder.append("replication=" + directive.getReplication() + ",");
- builder.append("pool=" + directive.getPool());
+ builder.append("pool=" + directive.getPool() + ",");
+ builder.append("expiration=" + directive.getExpiration().getMillis());
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
@@ -2961,12 +2971,12 @@ public abstract class FSEditLogOp {
CacheDirectiveInfo directive;
public ModifyCacheDirectiveInfoOp() {
- super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+ super(OP_MODIFY_CACHE_DIRECTIVE);
}
static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (ModifyCacheDirectiveInfoOp) cache
- .get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
+ .get(OP_MODIFY_CACHE_DIRECTIVE);
}
public ModifyCacheDirectiveInfoOp setDirective(
@@ -2991,7 +3001,12 @@ public abstract class FSEditLogOp {
if ((flags & 0x4) != 0) {
builder.setPool(FSImageSerialization.readString(in));
}
- if ((flags & ~0x7) != 0) {
+ if ((flags & 0x8) != 0) {
+ builder.setExpiration(
+ CacheDirectiveInfo.Expiration.newAbsolute(
+ FSImageSerialization.readLong(in)));
+ }
+ if ((flags & ~0xF) != 0) {
throw new IOException("unknown flags set in " +
"ModifyCacheDirectiveInfoOp: " + flags);
}
@@ -3005,7 +3020,8 @@ public abstract class FSEditLogOp {
byte flags = (byte)(
((directive.getPath() != null) ? 0x1 : 0) |
((directive.getReplication() != null) ? 0x2 : 0) |
- ((directive.getPool() != null) ? 0x4 : 0)
+ ((directive.getPool() != null) ? 0x4 : 0) |
+ ((directive.getExpiration() != null) ? 0x8 : 0)
);
out.writeByte(flags);
if (directive.getPath() != null) {
@@ -3018,6 +3034,10 @@ public abstract class FSEditLogOp {
if (directive.getPool() != null) {
FSImageSerialization.writeString(directive.getPool(), out);
}
+ if (directive.getExpiration() != null) {
+ FSImageSerialization.writeLong(directive.getExpiration().getMillis(),
+ out);
+ }
writeRpcIds(rpcClientId, rpcCallId, out);
}
@@ -3036,6 +3056,10 @@ public abstract class FSEditLogOp {
if (directive.getPool() != null) {
XMLUtils.addSaxString(contentHandler, "POOL", directive.getPool());
}
+ if (directive.getExpiration() != null) {
+ XMLUtils.addSaxString(contentHandler, "EXPIRATION",
+ "" + directive.getExpiration().getMillis());
+ }
appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
}
@@ -3056,6 +3080,11 @@ public abstract class FSEditLogOp {
if (pool != null) {
builder.setPool(pool);
}
+ String expiryTime = st.getValueOrNull("EXPIRATION");
+ if (expiryTime != null) {
+ builder.setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(
+ Long.parseLong(expiryTime)));
+ }
this.directive = builder.build();
readRpcIdsFromXml(st);
}
@@ -3075,6 +3104,10 @@ public abstract class FSEditLogOp {
if (directive.getPool() != null) {
builder.append(",").append("pool=").append(directive.getPool());
}
+ if (directive.getExpiration() != null) {
+ builder.append(",").append("expiration=").
+ append(directive.getExpiration().getMillis());
+ }
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
return builder.toString();
@@ -3089,12 +3122,12 @@ public abstract class FSEditLogOp {
long id;
public RemoveCacheDirectiveInfoOp() {
- super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+ super(OP_REMOVE_CACHE_DIRECTIVE);
}
static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
return (RemoveCacheDirectiveInfoOp) cache
- .get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
+ .get(OP_REMOVE_CACHE_DIRECTIVE);
}
public RemoveCacheDirectiveInfoOp setId(long id) {
@@ -3162,7 +3195,7 @@ public abstract class FSEditLogOp {
@Override
public void writeFields(DataOutputStream out) throws IOException {
- info .writeTo(out);
+ info.writeTo(out);
writeRpcIds(rpcClientId, rpcCallId, out);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Thu Nov 28 07:20:21 2013
@@ -64,12 +64,12 @@ public enum FSEditLogOpCodes {
OP_DISALLOW_SNAPSHOT ((byte) 30),
OP_SET_GENSTAMP_V2 ((byte) 31),
OP_ALLOCATE_BLOCK_ID ((byte) 32),
- OP_ADD_PATH_BASED_CACHE_DIRECTIVE ((byte) 33),
- OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE ((byte) 34),
+ OP_ADD_CACHE_DIRECTIVE ((byte) 33),
+ OP_REMOVE_CACHE_DIRECTIVE ((byte) 34),
OP_ADD_CACHE_POOL ((byte) 35),
OP_MODIFY_CACHE_POOL ((byte) 36),
OP_REMOVE_CACHE_POOL ((byte) 37),
- OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE ((byte) 38);
+ OP_MODIFY_CACHE_DIRECTIVE ((byte) 38);
private byte opCode;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Thu Nov 28 07:20:21 2013
@@ -29,12 +29,13 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.tools.TableListing.Justification;
import org.apache.hadoop.ipc.RemoteException;
@@ -132,7 +133,8 @@ public class CacheAdmin extends Configur
@Override
public String getShortUsage() {
return "[" + getName() +
- " -path <path> -replication <replication> -pool <pool-name>]\n";
+ " -path <path> -pool <pool-name> " +
+ "[-replication <replication>] [-ttl <time-to-live>]]\n";
}
@Override
@@ -140,11 +142,15 @@ public class CacheAdmin extends Configur
TableListing listing = getOptionDescriptionListing();
listing.addRow("<path>", "A path to cache. The path can be " +
"a directory or a file.");
- listing.addRow("<replication>", "The cache replication factor to use. " +
- "Defaults to 1.");
listing.addRow("<pool-name>", "The pool to which the directive will be " +
"added. You must have write permission on the cache pool "
+ "in order to add new directives.");
+ listing.addRow("<replication>", "The cache replication factor to use. " +
+ "Defaults to 1.");
+ listing.addRow("<time-to-live>", "How long the directive is " +
+ "valid. Can be specified in minutes, hours, and days via e.g. " +
+ "30m, 4h, 2d. Valid units are [smhd]." +
+ " If unspecified, the directive never expires.");
return getShortUsage() + "\n" +
"Add a new cache directive.\n\n" +
listing.toString();
@@ -152,33 +158,48 @@ public class CacheAdmin extends Configur
@Override
public int run(Configuration conf, List<String> args) throws IOException {
+ CacheDirectiveInfo.Builder builder = new CacheDirectiveInfo.Builder();
+
String path = StringUtils.popOptionWithArgument("-path", args);
if (path == null) {
System.err.println("You must specify a path with -path.");
return 1;
}
- short replication = 1;
- String replicationString =
- StringUtils.popOptionWithArgument("-replication", args);
- if (replicationString != null) {
- replication = Short.parseShort(replicationString);
- }
+ builder.setPath(new Path(path));
+
String poolName = StringUtils.popOptionWithArgument("-pool", args);
if (poolName == null) {
System.err.println("You must specify a pool name with -pool.");
return 1;
}
+ builder.setPool(poolName);
+
+ String replicationString =
+ StringUtils.popOptionWithArgument("-replication", args);
+ if (replicationString != null) {
+ Short replication = Short.parseShort(replicationString);
+ builder.setReplication(replication);
+ }
+
+ String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
+ if (ttlString != null) {
+ try {
+ long ttl = DFSUtil.parseRelativeTime(ttlString);
+ builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
+ } catch (IOException e) {
+ System.err.println(
+ "Error while parsing ttl value: " + e.getMessage());
+ return 1;
+ }
+ }
+
if (!args.isEmpty()) {
System.err.println("Can't understand argument: " + args.get(0));
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
- CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder().
- setPath(new Path(path)).
- setReplication(replication).
- setPool(poolName).
- build();
+ CacheDirectiveInfo directive = builder.build();
try {
long id = dfs.addCacheDirective(directive);
System.out.println("Added cache directive " + id);
@@ -261,7 +282,7 @@ public class CacheAdmin extends Configur
public String getShortUsage() {
return "[" + getName() +
" -id <id> [-path <path>] [-replication <replication>] " +
- "[-pool <pool-name>] ]\n";
+ "[-pool <pool-name>] [-ttl <time-to-live>]]\n";
}
@Override
@@ -275,6 +296,10 @@ public class CacheAdmin extends Configur
listing.addRow("<pool-name>", "The pool to which the directive will be " +
"added. You must have write permission on the cache pool "
+ "in order to move a directive into it. (optional)");
+ listing.addRow("<time-to-live>", "How long the directive is " +
+ "valid. Can be specified in minutes, hours, and days via e.g. " +
+ "30m, 4h, 2d. Valid units are [smhd]." +
+ " If unspecified, the directive never expires.");
return getShortUsage() + "\n" +
"Modify a cache directive.\n\n" +
listing.toString();
@@ -308,6 +333,19 @@ public class CacheAdmin extends Configur
builder.setPool(poolName);
modified = true;
}
+ String ttlString = StringUtils.popOptionWithArgument("-ttl", args);
+ if (ttlString != null) {
+ long ttl;
+ try {
+ ttl = DFSUtil.parseRelativeTime(ttlString);
+ } catch (IOException e) {
+ System.err.println(
+ "Error while parsing ttl value: " + e.getMessage());
+ return 1;
+ }
+ builder.setExpiration(CacheDirectiveInfo.Expiration.newRelative(ttl));
+ modified = true;
+ }
if (!args.isEmpty()) {
System.err.println("Can't understand argument: " + args.get(0));
System.err.println("Usage is " + getShortUsage());
@@ -435,7 +473,8 @@ public class CacheAdmin extends Configur
TableListing.Builder tableBuilder = new TableListing.Builder().
addField("ID", Justification.RIGHT).
addField("POOL", Justification.LEFT).
- addField("REPLICATION", Justification.RIGHT).
+ addField("REPL", Justification.RIGHT).
+ addField("EXPIRY", Justification.LEFT).
addField("PATH", Justification.LEFT);
if (printStats) {
tableBuilder.addField("NEEDED", Justification.RIGHT).
@@ -456,6 +495,14 @@ public class CacheAdmin extends Configur
row.add("" + directive.getId());
row.add(directive.getPool());
row.add("" + directive.getReplication());
+ String expiry;
+ if (directive.getExpiration().getMillis() ==
+ CacheDirectiveInfo.Expiration.EXPIRY_NEVER) {
+ expiry = "never";
+ } else {
+ expiry = directive.getExpiration().toString();
+ }
+ row.add(expiry);
row.add(directive.getPath().toUri().getPath());
if (printStats) {
row.add("" + stats.getBytesNeeded());
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Thu Nov 28 07:20:21 2013
@@ -368,12 +368,19 @@ message CacheDirectiveInfoProto {
optional string path = 2;
optional uint32 replication = 3;
optional string pool = 4;
+ optional CacheDirectiveInfoExpirationProto expiration = 5;
+}
+
+message CacheDirectiveInfoExpirationProto {
+ required int64 millis = 1;
+ required bool isRelative = 2;
}
message CacheDirectiveStatsProto {
required int64 bytesNeeded = 1;
required int64 bytesCached = 2;
required int64 filesAffected = 3;
+ required bool hasExpired = 4;
}
message AddCacheDirectiveRequestProto {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java Thu Nov 28 07:20:21 2013
@@ -31,6 +31,7 @@ import static org.apache.hadoop.hdfs.DFS
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
+import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
import static org.hamcrest.CoreMatchers.not;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -62,6 +63,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Shell;
import org.junit.Assume;
import org.junit.Before;
@@ -724,4 +726,43 @@ public class TestDFSUtil {
DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_KEYTAB_KEY,
DFSUtil.getSpnegoKeytabKey(conf, defaultKey));
}
+
+ @Test(timeout=1000)
+ public void testDurationToString() throws Exception {
+ assertEquals("000:00:00:00", DFSUtil.durationToString(0));
+ try {
+ DFSUtil.durationToString(-199);
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains("Invalid negative duration", e);
+ }
+ assertEquals("001:01:01:01",
+ DFSUtil.durationToString(((24*60*60)+(60*60)+(60)+1)*1000));
+ assertEquals("000:23:59:59",
+ DFSUtil.durationToString(((23*60*60)+(59*60)+(59))*1000));
+ }
+
+ @Test(timeout=5000)
+ public void testRelativeTimeConversion() throws Exception {
+ try {
+ DFSUtil.parseRelativeTime("1");
+ } catch (IOException e) {
+ assertExceptionContains("too short", e);
+ }
+ try {
+ DFSUtil.parseRelativeTime("1z");
+ } catch (IOException e) {
+ assertExceptionContains("unknown time unit", e);
+ }
+ try {
+ DFSUtil.parseRelativeTime("yyz");
+ } catch (IOException e) {
+ assertExceptionContains("is not a number", e);
+ }
+ assertEquals(61*1000, DFSUtil.parseRelativeTime("61s"));
+ assertEquals(61*60*1000, DFSUtil.parseRelativeTime("61m"));
+ assertEquals(0, DFSUtil.parseRelativeTime("0s"));
+ assertEquals(25*60*60*1000, DFSUtil.parseRelativeTime("25h"));
+ assertEquals(4*24*60*60*1000, DFSUtil.parseRelativeTime("4d"));
+ assertEquals(999*24*60*60*1000, DFSUtil.parseRelativeTime("999d"));
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Thu Nov 28 07:20:21 2013
@@ -33,10 +33,12 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
+import java.util.Date;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
+import org.apache.commons.lang.time.DateUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -54,13 +56,13 @@ import org.apache.hadoop.hdfs.Distribute
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo.Expiration;
import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
-import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
@@ -521,10 +523,14 @@ public class TestCacheDirectives {
int numEntries = 10;
String entryPrefix = "/party-";
long prevId = -1;
+ final Date expiry = new Date();
for (int i=0; i<numEntries; i++) {
prevId = dfs.addCacheDirective(
new CacheDirectiveInfo.Builder().
- setPath(new Path(entryPrefix + i)).setPool(pool).build());
+ setPath(new Path(entryPrefix + i)).setPool(pool).
+ setExpiration(
+ CacheDirectiveInfo.Expiration.newAbsolute(expiry.getTime())).
+ build());
}
RemoteIterator<CacheDirectiveEntry> dit
= dfs.listCacheDirectives(null);
@@ -558,6 +564,7 @@ public class TestCacheDirectives {
assertEquals(i+1, cd.getId().longValue());
assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
assertEquals(pool, cd.getPool());
+ assertEquals(expiry.getTime(), cd.getExpiration().getMillis());
}
assertFalse("Unexpected # of cache directives found", dit.hasNext());
@@ -1001,4 +1008,58 @@ public class TestCacheDirectives {
info.getMode().toShort());
assertEquals("Mismatched weight", 99, (int)info.getWeight());
}
+
+ @Test(timeout=60000)
+ public void testExpiry() throws Exception {
+ HdfsConfiguration conf = createCachingConf();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+ try {
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ String pool = "pool1";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ Path p = new Path("/mypath");
+ DFSTestUtil.createFile(dfs, p, BLOCK_SIZE*2, (short)2, 0x999);
+ // Expire after test timeout
+ Date start = new Date();
+ Date expiry = DateUtils.addSeconds(start, 120);
+ final long id = dfs.addCacheDirective(new CacheDirectiveInfo.Builder()
+ .setPath(p)
+ .setPool(pool)
+ .setExpiration(CacheDirectiveInfo.Expiration.newAbsolute(expiry))
+ .setReplication((short)2)
+ .build());
+ waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:1");
+ // Change it to expire sooner
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+ .setExpiration(Expiration.newRelative(0)).build());
+ waitForCachedBlocks(cluster.getNameNode(), 0, 0, "testExpiry:2");
+ RemoteIterator<CacheDirectiveEntry> it = dfs.listCacheDirectives(null);
+ CacheDirectiveEntry ent = it.next();
+ assertFalse(it.hasNext());
+ Date entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+ assertTrue("Directive should have expired",
+ entryExpiry.before(new Date()));
+ // Change it back to expire later
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+ .setExpiration(Expiration.newRelative(120000)).build());
+ waitForCachedBlocks(cluster.getNameNode(), 2, 4, "testExpiry:3");
+ it = dfs.listCacheDirectives(null);
+ ent = it.next();
+ assertFalse(it.hasNext());
+ entryExpiry = new Date(ent.getInfo().getExpiration().getMillis());
+ assertTrue("Directive should not have expired",
+ entryExpiry.after(new Date()));
+ // Verify that setting a negative TTL throws an error
+ try {
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().setId(id)
+ .setExpiration(Expiration.newRelative(-1)).build());
+ } catch (InvalidRequestException e) {
+ GenericTestUtils
+ .assertExceptionContains("Cannot set a negative expiration", e);
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored?rev=1546301&r1=1546300&r2=1546301&view=diff
==============================================================================
Binary files - no diff available.