You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2013/11/21 18:12:59 UTC
svn commit: r1544252 [2/3] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/client/
src/main/java/org/apache/hadoop/hdfs/protocol/
src/main/java/org/apache/hadoop/h...
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java Thu Nov 21 17:12:58 2013
@@ -38,15 +38,15 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.FormatConfirmable;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.CancelDelegationTokenOp;
@@ -63,7 +63,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.OpInstanceCache;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -954,27 +954,27 @@ public class FSEditLog implements LogsPu
logEdit(op);
}
- void logAddPathBasedCacheDirective(PathBasedCacheDirective directive,
+ void logAddCacheDirectiveInfo(CacheDirectiveInfo directive,
boolean toLogRpcIds) {
- AddPathBasedCacheDirectiveOp op =
- AddPathBasedCacheDirectiveOp.getInstance(cache.get())
+ AddCacheDirectiveInfoOp op =
+ AddCacheDirectiveInfoOp.getInstance(cache.get())
.setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
- void logModifyPathBasedCacheDirective(
- PathBasedCacheDirective directive, boolean toLogRpcIds) {
- ModifyPathBasedCacheDirectiveOp op =
- ModifyPathBasedCacheDirectiveOp.getInstance(
+ void logModifyCacheDirectiveInfo(
+ CacheDirectiveInfo directive, boolean toLogRpcIds) {
+ ModifyCacheDirectiveInfoOp op =
+ ModifyCacheDirectiveInfoOp.getInstance(
cache.get()).setDirective(directive);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
- void logRemovePathBasedCacheDirective(Long id, boolean toLogRpcIds) {
- RemovePathBasedCacheDirectiveOp op =
- RemovePathBasedCacheDirectiveOp.getInstance(cache.get()).setId(id);
+ void logRemoveCacheDirectiveInfo(Long id, boolean toLogRpcIds) {
+ RemoveCacheDirectiveInfoOp op =
+ RemoveCacheDirectiveInfoOp.getInstance(cache.get()).setId(id);
logRpcIds(op, toLogRpcIds);
logEdit(op);
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java Thu Nov 21 17:12:58 2013
@@ -36,13 +36,13 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCachePoolOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCloseOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AddCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllocateBlockIdOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.AllowSnapshotOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.BlockListUpdatingOp;
@@ -56,10 +56,10 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.GetDelegationTokenOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.MkdirOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCachePoolOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyPathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ModifyCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.ReassignLeaseOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCachePoolOp;
-import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemovePathBasedCacheDirectiveOp;
+import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RemoveCacheDirectiveInfoOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOldOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameOp;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.RenameSnapshotOp;
@@ -639,8 +639,8 @@ public class FSEditLogLoader {
break;
}
case OP_ADD_PATH_BASED_CACHE_DIRECTIVE: {
- AddPathBasedCacheDirectiveOp addOp = (AddPathBasedCacheDirectiveOp) op;
- PathBasedCacheDirective result = fsNamesys.
+ AddCacheDirectiveInfoOp addOp = (AddCacheDirectiveInfoOp) op;
+ CacheDirectiveInfo result = fsNamesys.
getCacheManager().addDirective(addOp.directive, null);
if (toAddRetryCache) {
Long id = result.getId();
@@ -649,8 +649,8 @@ public class FSEditLogLoader {
break;
}
case OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE: {
- ModifyPathBasedCacheDirectiveOp modifyOp =
- (ModifyPathBasedCacheDirectiveOp) op;
+ ModifyCacheDirectiveInfoOp modifyOp =
+ (ModifyCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().modifyDirective(
modifyOp.directive, null);
if (toAddRetryCache) {
@@ -659,8 +659,8 @@ public class FSEditLogLoader {
break;
}
case OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE: {
- RemovePathBasedCacheDirectiveOp removeOp =
- (RemovePathBasedCacheDirectiveOp) op;
+ RemoveCacheDirectiveInfoOp removeOp =
+ (RemoveCacheDirectiveInfoOp) op;
fsNamesys.getCacheManager().removeDirective(removeOp.id, null);
if (toAddRetryCache) {
fsNamesys.addCacheEntry(op.rpcClientId, op.rpcCallId);
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Thu Nov 21 17:12:58 2013
@@ -86,7 +86,7 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.util.XMLUtils;
import org.apache.hadoop.hdfs.util.XMLUtils.InvalidXmlException;
@@ -166,11 +166,11 @@ public abstract class FSEditLogOp {
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
- new AddPathBasedCacheDirectiveOp());
+ new AddCacheDirectiveInfoOp());
inst.put(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE,
- new ModifyPathBasedCacheDirectiveOp());
+ new ModifyCacheDirectiveInfoOp());
inst.put(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE,
- new RemovePathBasedCacheDirectiveOp());
+ new RemoveCacheDirectiveInfoOp());
inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
@@ -2868,22 +2868,22 @@ public abstract class FSEditLogOp {
/**
* {@literal @AtMostOnce} for
- * {@link ClientProtocol#addPathBasedCacheDirective}
+ * {@link ClientProtocol#addCacheDirective}
*/
- static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
- PathBasedCacheDirective directive;
+ static class AddCacheDirectiveInfoOp extends FSEditLogOp {
+ CacheDirectiveInfo directive;
- public AddPathBasedCacheDirectiveOp() {
+ public AddCacheDirectiveInfoOp() {
super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
}
- static AddPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
- return (AddPathBasedCacheDirectiveOp) cache
+ static AddCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
+ return (AddCacheDirectiveInfoOp) cache
.get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
}
- public AddPathBasedCacheDirectiveOp setDirective(
- PathBasedCacheDirective directive) {
+ public AddCacheDirectiveInfoOp setDirective(
+ CacheDirectiveInfo directive) {
this.directive = directive;
assert(directive.getId() != null);
assert(directive.getPath() != null);
@@ -2898,7 +2898,7 @@ public abstract class FSEditLogOp {
String path = FSImageSerialization.readString(in);
short replication = FSImageSerialization.readShort(in);
String pool = FSImageSerialization.readString(in);
- directive = new PathBasedCacheDirective.Builder().
+ directive = new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path(path)).
setReplication(replication).
@@ -2930,7 +2930,7 @@ public abstract class FSEditLogOp {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
- directive = new PathBasedCacheDirective.Builder().
+ directive = new CacheDirectiveInfo.Builder().
setId(Long.parseLong(st.getValue("ID"))).
setPath(new Path(st.getValue("PATH"))).
setReplication(Short.parseShort(st.getValue("REPLICATION"))).
@@ -2942,7 +2942,7 @@ public abstract class FSEditLogOp {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("AddPathBasedCacheDirective [");
+ builder.append("AddCacheDirectiveInfo [");
builder.append("id=" + directive.getId() + ",");
builder.append("path=" + directive.getPath().toUri().getPath() + ",");
builder.append("replication=" + directive.getReplication() + ",");
@@ -2955,22 +2955,22 @@ public abstract class FSEditLogOp {
/**
* {@literal @AtMostOnce} for
- * {@link ClientProtocol#modifyPathBasedCacheDirective}
+ * {@link ClientProtocol#modifyCacheDirective}
*/
- static class ModifyPathBasedCacheDirectiveOp extends FSEditLogOp {
- PathBasedCacheDirective directive;
+ static class ModifyCacheDirectiveInfoOp extends FSEditLogOp {
+ CacheDirectiveInfo directive;
- public ModifyPathBasedCacheDirectiveOp() {
+ public ModifyCacheDirectiveInfoOp() {
super(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
}
- static ModifyPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
- return (ModifyPathBasedCacheDirectiveOp) cache
+ static ModifyCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
+ return (ModifyCacheDirectiveInfoOp) cache
.get(OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE);
}
- public ModifyPathBasedCacheDirectiveOp setDirective(
- PathBasedCacheDirective directive) {
+ public ModifyCacheDirectiveInfoOp setDirective(
+ CacheDirectiveInfo directive) {
this.directive = directive;
assert(directive.getId() != null);
return this;
@@ -2978,8 +2978,8 @@ public abstract class FSEditLogOp {
@Override
void readFields(DataInputStream in, int logVersion) throws IOException {
- PathBasedCacheDirective.Builder builder =
- new PathBasedCacheDirective.Builder();
+ CacheDirectiveInfo.Builder builder =
+ new CacheDirectiveInfo.Builder();
builder.setId(FSImageSerialization.readLong(in));
byte flags = in.readByte();
if ((flags & 0x1) != 0) {
@@ -2993,7 +2993,7 @@ public abstract class FSEditLogOp {
}
if ((flags & ~0x7) != 0) {
throw new IOException("unknown flags set in " +
- "ModifyPathBasedCacheDirectiveOp: " + flags);
+ "ModifyCacheDirectiveInfoOp: " + flags);
}
this.directive = builder.build();
readRpcIds(in, logVersion);
@@ -3041,8 +3041,8 @@ public abstract class FSEditLogOp {
@Override
void fromXml(Stanza st) throws InvalidXmlException {
- PathBasedCacheDirective.Builder builder =
- new PathBasedCacheDirective.Builder();
+ CacheDirectiveInfo.Builder builder =
+ new CacheDirectiveInfo.Builder();
builder.setId(Long.parseLong(st.getValue("ID")));
String path = st.getValueOrNull("PATH");
if (path != null) {
@@ -3063,7 +3063,7 @@ public abstract class FSEditLogOp {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("ModifyPathBasedCacheDirectiveOp[");
+ builder.append("ModifyCacheDirectiveInfoOp[");
builder.append("id=").append(directive.getId());
if (directive.getPath() != null) {
builder.append(",").append("path=").append(directive.getPath());
@@ -3083,21 +3083,21 @@ public abstract class FSEditLogOp {
/**
* {@literal @AtMostOnce} for
- * {@link ClientProtocol#removePathBasedCacheDirective}
+ * {@link ClientProtocol#removeCacheDirective}
*/
- static class RemovePathBasedCacheDirectiveOp extends FSEditLogOp {
+ static class RemoveCacheDirectiveInfoOp extends FSEditLogOp {
long id;
- public RemovePathBasedCacheDirectiveOp() {
+ public RemoveCacheDirectiveInfoOp() {
super(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
}
- static RemovePathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
- return (RemovePathBasedCacheDirectiveOp) cache
+ static RemoveCacheDirectiveInfoOp getInstance(OpInstanceCache cache) {
+ return (RemoveCacheDirectiveInfoOp) cache
.get(OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE);
}
- public RemovePathBasedCacheDirectiveOp setId(long id) {
+ public RemoveCacheDirectiveInfoOp setId(long id) {
this.id = id;
return this;
}
@@ -3129,7 +3129,7 @@ public abstract class FSEditLogOp {
@Override
public String toString() {
StringBuilder builder = new StringBuilder();
- builder.append("RemovePathBasedCacheDirective [");
+ builder.append("RemoveCacheDirectiveInfo [");
builder.append("id=" + Long.toString(id));
appendRpcIdsToString(builder, rpcClientId, rpcCallId);
builder.append("]");
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Thu Nov 21 17:12:58 2013
@@ -151,7 +151,8 @@ import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -7064,8 +7065,8 @@ public class FSNamesystem implements Nam
}
}
- long addPathBasedCacheDirective(
- PathBasedCacheDirective directive) throws IOException {
+ long addCacheDirective(
+ CacheDirectiveInfo directive) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@@ -7081,15 +7082,15 @@ public class FSNamesystem implements Nam
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
- "Cannot add PathBasedCache directive", safeMode);
+ "Cannot add cache directive", safeMode);
}
if (directive.getId() != null) {
throw new IOException("addDirective: you cannot specify an ID " +
"for this operation.");
}
- PathBasedCacheDirective effectiveDirective =
+ CacheDirectiveInfo effectiveDirective =
cacheManager.addDirective(directive, pc);
- getEditLog().logAddPathBasedCacheDirective(effectiveDirective,
+ getEditLog().logAddCacheDirectiveInfo(effectiveDirective,
cacheEntry != null);
result = effectiveDirective.getId();
success = true;
@@ -7099,15 +7100,15 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
}
if (isAuditEnabled() && isExternalInvocation()) {
- logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
+ logAuditEvent(success, "addCacheDirective", null, null, null);
}
RetryCache.setState(cacheEntry, success, result);
}
return result;
}
- void modifyPathBasedCacheDirective(
- PathBasedCacheDirective directive) throws IOException {
+ void modifyCacheDirective(
+ CacheDirectiveInfo directive) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@@ -7121,10 +7122,10 @@ public class FSNamesystem implements Nam
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
- "Cannot add PathBasedCache directive", safeMode);
+ "Cannot add cache directive", safeMode);
}
cacheManager.modifyDirective(directive, pc);
- getEditLog().logModifyPathBasedCacheDirective(directive,
+ getEditLog().logModifyCacheDirectiveInfo(directive,
cacheEntry != null);
success = true;
} finally {
@@ -7133,13 +7134,13 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
}
if (isAuditEnabled() && isExternalInvocation()) {
- logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
+ logAuditEvent(success, "addCacheDirective", null, null, null);
}
RetryCache.setState(cacheEntry, success);
}
}
- void removePathBasedCacheDirective(Long id) throws IOException {
+ void removeCacheDirective(Long id) throws IOException {
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
@@ -7153,15 +7154,15 @@ public class FSNamesystem implements Nam
checkOperation(OperationCategory.WRITE);
if (isInSafeMode()) {
throw new SafeModeException(
- "Cannot remove PathBasedCache directives", safeMode);
+ "Cannot remove cache directives", safeMode);
}
cacheManager.removeDirective(id, pc);
- getEditLog().logRemovePathBasedCacheDirective(id, cacheEntry != null);
+ getEditLog().logRemoveCacheDirectiveInfo(id, cacheEntry != null);
success = true;
} finally {
writeUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
- logAuditEvent(success, "removePathBasedCacheDirective", null, null,
+ logAuditEvent(success, "removeCacheDirective", null, null,
null);
}
RetryCache.setState(cacheEntry, success);
@@ -7169,23 +7170,23 @@ public class FSNamesystem implements Nam
getEditLog().logSync();
}
- BatchedListEntries<PathBasedCacheDirective> listPathBasedCacheDirectives(
- long startId, PathBasedCacheDirective filter) throws IOException {
+ BatchedListEntries<CacheDirectiveEntry> listCacheDirectives(
+ long startId, CacheDirectiveInfo filter) throws IOException {
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = isPermissionEnabled ?
getPermissionChecker() : null;
- BatchedListEntries<PathBasedCacheDirective> results;
+ BatchedListEntries<CacheDirectiveEntry> results;
readLock();
boolean success = false;
try {
checkOperation(OperationCategory.READ);
results =
- cacheManager.listPathBasedCacheDirectives(startId, filter, pc);
+ cacheManager.listCacheDirectives(startId, filter, pc);
success = true;
} finally {
readUnlock();
if (isAuditEnabled() && isExternalInvocation()) {
- logAuditEvent(success, "listPathBasedCacheDirectives", null, null,
+ logAuditEvent(success, "listCacheDirectives", null, null,
null);
}
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Thu Nov 21 17:12:58 2013
@@ -61,7 +61,8 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -1233,52 +1234,52 @@ class NameNodeRpcServer implements Namen
}
@Override
- public long addPathBasedCacheDirective(
- PathBasedCacheDirective path) throws IOException {
- return namesystem.addPathBasedCacheDirective(path);
+ public long addCacheDirective(
+ CacheDirectiveInfo path) throws IOException {
+ return namesystem.addCacheDirective(path);
}
@Override
- public void modifyPathBasedCacheDirective(
- PathBasedCacheDirective directive) throws IOException {
- namesystem.modifyPathBasedCacheDirective(directive);
+ public void modifyCacheDirective(
+ CacheDirectiveInfo directive) throws IOException {
+ namesystem.modifyCacheDirective(directive);
}
@Override
- public void removePathBasedCacheDirective(long id) throws IOException {
- namesystem.removePathBasedCacheDirective(id);
+ public void removeCacheDirective(long id) throws IOException {
+ namesystem.removeCacheDirective(id);
}
- private class ServerSidePathBasedCacheEntriesIterator
- extends BatchedRemoteIterator<Long, PathBasedCacheDirective> {
+ private class ServerSideCacheEntriesIterator
+ extends BatchedRemoteIterator<Long, CacheDirectiveEntry> {
- private final PathBasedCacheDirective filter;
+ private final CacheDirectiveInfo filter;
- public ServerSidePathBasedCacheEntriesIterator(Long firstKey,
- PathBasedCacheDirective filter) {
+ public ServerSideCacheEntriesIterator (Long firstKey,
+ CacheDirectiveInfo filter) {
super(firstKey);
this.filter = filter;
}
@Override
- public BatchedEntries<PathBasedCacheDirective> makeRequest(
+ public BatchedEntries<CacheDirectiveEntry> makeRequest(
Long nextKey) throws IOException {
- return namesystem.listPathBasedCacheDirectives(nextKey, filter);
+ return namesystem.listCacheDirectives(nextKey, filter);
}
@Override
- public Long elementToPrevKey(PathBasedCacheDirective entry) {
- return entry.getId();
+ public Long elementToPrevKey(CacheDirectiveEntry entry) {
+ return entry.getInfo().getId();
}
}
@Override
- public RemoteIterator<PathBasedCacheDirective> listPathBasedCacheDirectives(long prevId,
- PathBasedCacheDirective filter) throws IOException {
+ public RemoteIterator<CacheDirectiveEntry> listCacheDirectives(long prevId,
+ CacheDirectiveInfo filter) throws IOException {
if (filter == null) {
- filter = new PathBasedCacheDirective.Builder().build();
+ filter = new CacheDirectiveInfo.Builder().build();
}
- return new ServerSidePathBasedCacheEntriesIterator(prevId, filter);
+ return new ServerSideCacheEntriesIterator(prevId, filter);
}
@Override
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Thu Nov 21 17:12:58 2013
@@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.tools;
import java.io.IOException;
-import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
@@ -31,8 +30,10 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.namenode.CachePool;
import org.apache.hadoop.hdfs.tools.TableListing.Justification;
import org.apache.hadoop.ipc.RemoteException;
@@ -121,7 +122,7 @@ public class CacheAdmin extends Configur
int run(Configuration conf, List<String> args) throws IOException;
}
- private static class AddPathBasedCacheDirectiveCommand implements Command {
+ private static class AddCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-addDirective";
@@ -144,7 +145,7 @@ public class CacheAdmin extends Configur
"added. You must have write permission on the cache pool "
+ "in order to add new directives.");
return getShortUsage() + "\n" +
- "Add a new PathBasedCache directive.\n\n" +
+ "Add a new cache directive.\n\n" +
listing.toString();
}
@@ -172,14 +173,14 @@ public class CacheAdmin extends Configur
}
DistributedFileSystem dfs = getDFS(conf);
- PathBasedCacheDirective directive = new PathBasedCacheDirective.Builder().
+ CacheDirectiveInfo directive = new CacheDirectiveInfo.Builder().
setPath(new Path(path)).
setReplication(replication).
setPool(poolName).
build();
try {
- long id = dfs.addPathBasedCacheDirective(directive);
- System.out.println("Added PathBasedCache entry " + id);
+ long id = dfs.addCacheDirective(directive);
+ System.out.println("Added cache directive " + id);
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
@@ -189,7 +190,7 @@ public class CacheAdmin extends Configur
}
}
- private static class RemovePathBasedCacheDirectiveCommand implements Command {
+ private static class RemoveCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-removeDirective";
@@ -206,7 +207,7 @@ public class CacheAdmin extends Configur
listing.addRow("<id>", "The id of the cache directive to remove. " +
"You must have write permission on the pool of the " +
"directive in order to remove it. To see a list " +
- "of PathBasedCache directive IDs, use the -listDirectives command.");
+ "of cache directive IDs, use the -listDirectives command.");
return getShortUsage() + "\n" +
"Remove a cache directive.\n\n" +
listing.toString();
@@ -239,8 +240,8 @@ public class CacheAdmin extends Configur
}
DistributedFileSystem dfs = getDFS(conf);
try {
- dfs.getClient().removePathBasedCacheDirective(id);
- System.out.println("Removed PathBasedCache directive " + id);
+ dfs.getClient().removeCacheDirective(id);
+ System.out.println("Removed cached directive " + id);
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
@@ -249,7 +250,7 @@ public class CacheAdmin extends Configur
}
}
- private static class ModifyPathBasedCacheDirectiveCommand implements Command {
+ private static class ModifyCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-modifyDirective";
@@ -274,14 +275,14 @@ public class CacheAdmin extends Configur
"added. You must have write permission on the cache pool "
+ "in order to move a directive into it. (optional)");
return getShortUsage() + "\n" +
- "Modify a PathBasedCache directive.\n\n" +
+ "Modify a cache directive.\n\n" +
listing.toString();
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
- PathBasedCacheDirective.Builder builder =
- new PathBasedCacheDirective.Builder();
+ CacheDirectiveInfo.Builder builder =
+ new CacheDirectiveInfo.Builder();
boolean modified = false;
String idString = StringUtils.popOptionWithArgument("-id", args);
if (idString == null) {
@@ -317,8 +318,8 @@ public class CacheAdmin extends Configur
}
DistributedFileSystem dfs = getDFS(conf);
try {
- dfs.modifyPathBasedCacheDirective(builder.build());
- System.out.println("Modified PathBasedCache entry " + idString);
+ dfs.modifyCacheDirective(builder.build());
+ System.out.println("Modified cache directive " + idString);
} catch (IOException e) {
System.err.println(prettifyException(e));
return 2;
@@ -327,7 +328,7 @@ public class CacheAdmin extends Configur
}
}
- private static class RemovePathBasedCacheDirectivesCommand implements Command {
+ private static class RemoveCacheDirectiveInfosCommand implements Command {
@Override
public String getName() {
return "-removeDirectives";
@@ -363,31 +364,31 @@ public class CacheAdmin extends Configur
return 1;
}
DistributedFileSystem dfs = getDFS(conf);
- RemoteIterator<PathBasedCacheDirective> iter =
- dfs.listPathBasedCacheDirectives(
- new PathBasedCacheDirective.Builder().
+ RemoteIterator<CacheDirectiveEntry> iter =
+ dfs.listCacheDirectives(
+ new CacheDirectiveInfo.Builder().
setPath(new Path(path)).build());
int exitCode = 0;
while (iter.hasNext()) {
- PathBasedCacheDirective directive = iter.next();
+ CacheDirectiveEntry entry = iter.next();
try {
- dfs.removePathBasedCacheDirective(directive.getId());
- System.out.println("Removed PathBasedCache directive " +
- directive.getId());
+ dfs.removeCacheDirective(entry.getInfo().getId());
+ System.out.println("Removed cache directive " +
+ entry.getInfo().getId());
} catch (IOException e) {
System.err.println(prettifyException(e));
exitCode = 2;
}
}
if (exitCode == 0) {
- System.out.println("Removed every PathBasedCache directive with path " +
+ System.out.println("Removed every cache directive with path " +
path);
}
return exitCode;
}
}
- private static class ListPathBasedCacheDirectiveCommand implements Command {
+ private static class ListCacheDirectiveInfoCommand implements Command {
@Override
public String getName() {
return "-listDirectives";
@@ -402,21 +403,21 @@ public class CacheAdmin extends Configur
public String getLongUsage() {
TableListing listing = getOptionDescriptionListing();
listing.addRow("<path>", "List only " +
- "PathBasedCache directives with this path. " +
- "Note that if there is a PathBasedCache directive for <path> " +
+ "cache directives with this path. " +
+ "Note that if there is a cache directive for <path> " +
"in a cache pool that we don't have read access for, it " +
"will not be listed.");
listing.addRow("<pool>", "List only path cache directives in that pool.");
listing.addRow("-stats", "List path-based cache directive statistics.");
return getShortUsage() + "\n" +
- "List PathBasedCache directives.\n\n" +
+ "List cache directives.\n\n" +
listing.toString();
}
@Override
public int run(Configuration conf, List<String> args) throws IOException {
- PathBasedCacheDirective.Builder builder =
- new PathBasedCacheDirective.Builder();
+ CacheDirectiveInfo.Builder builder =
+ new CacheDirectiveInfo.Builder();
String pathFilter = StringUtils.popOptionWithArgument("-path", args);
if (pathFilter != null) {
builder.setPath(new Path(pathFilter));
@@ -443,20 +444,22 @@ public class CacheAdmin extends Configur
TableListing tableListing = tableBuilder.build();
DistributedFileSystem dfs = getDFS(conf);
- RemoteIterator<PathBasedCacheDirective> iter =
- dfs.listPathBasedCacheDirectives(builder.build());
+ RemoteIterator<CacheDirectiveEntry> iter =
+ dfs.listCacheDirectives(builder.build());
int numEntries = 0;
while (iter.hasNext()) {
- PathBasedCacheDirective directive = iter.next();
+ CacheDirectiveEntry entry = iter.next();
+ CacheDirectiveInfo directive = entry.getInfo();
+ CacheDirectiveStats stats = entry.getStats();
List<String> row = new LinkedList<String>();
row.add("" + directive.getId());
row.add(directive.getPool());
row.add("" + directive.getReplication());
row.add(directive.getPath().toUri().getPath());
if (printStats) {
- row.add("" + directive.getBytesNeeded());
- row.add("" + directive.getBytesCached());
- row.add("" + directive.getFilesAffected());
+ row.add("" + stats.getBytesNeeded());
+ row.add("" + stats.getBytesCached());
+ row.add("" + stats.getFilesAffected());
}
tableListing.addRow(row.toArray(new String[0]));
numEntries++;
@@ -838,11 +841,11 @@ public class CacheAdmin extends Configur
}
private static Command[] COMMANDS = {
- new AddPathBasedCacheDirectiveCommand(),
- new ModifyPathBasedCacheDirectiveCommand(),
- new ListPathBasedCacheDirectiveCommand(),
- new RemovePathBasedCacheDirectiveCommand(),
- new RemovePathBasedCacheDirectivesCommand(),
+ new AddCacheDirectiveInfoCommand(),
+ new ModifyCacheDirectiveInfoCommand(),
+ new ListCacheDirectiveInfoCommand(),
+ new RemoveCacheDirectiveInfoCommand(),
+ new RemoveCacheDirectiveInfosCommand(),
new AddCachePoolCommand(),
new ModifyCachePoolCommand(),
new RemoveCachePoolCommand(),
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Thu Nov 21 17:12:58 2013
@@ -363,49 +363,53 @@ message IsFileClosedResponseProto {
required bool result = 1;
}
-message PathBasedCacheDirectiveInfoProto {
+message CacheDirectiveInfoProto {
optional int64 id = 1;
optional string path = 2;
optional uint32 replication = 3;
optional string pool = 4;
- optional int64 bytesNeeded = 5;
- optional int64 bytesCached = 6;
- optional int64 filesAffected = 7;
}
-message AddPathBasedCacheDirectiveRequestProto {
- required PathBasedCacheDirectiveInfoProto info = 1;
+message CacheDirectiveStatsProto {
+ required int64 bytesNeeded = 1;
+ required int64 bytesCached = 2;
+ required int64 filesAffected = 3;
}
-message AddPathBasedCacheDirectiveResponseProto {
+message AddCacheDirectiveRequestProto {
+ required CacheDirectiveInfoProto info = 1;
+}
+
+message AddCacheDirectiveResponseProto {
required int64 id = 1;
}
-message ModifyPathBasedCacheDirectiveRequestProto {
- required PathBasedCacheDirectiveInfoProto info = 1;
+message ModifyCacheDirectiveRequestProto {
+ required CacheDirectiveInfoProto info = 1;
}
-message ModifyPathBasedCacheDirectiveResponseProto {
+message ModifyCacheDirectiveResponseProto {
}
-message RemovePathBasedCacheDirectiveRequestProto {
+message RemoveCacheDirectiveRequestProto {
required int64 id = 1;
}
-message RemovePathBasedCacheDirectiveResponseProto {
+message RemoveCacheDirectiveResponseProto {
}
-message ListPathBasedCacheDirectivesRequestProto {
+message ListCacheDirectivesRequestProto {
required int64 prevId = 1;
- required PathBasedCacheDirectiveInfoProto filter = 2;
+ required CacheDirectiveInfoProto filter = 2;
}
-message ListPathBasedCacheDirectivesElementProto {
- required PathBasedCacheDirectiveInfoProto info = 1;
+message CacheDirectiveEntryProto {
+ required CacheDirectiveInfoProto info = 1;
+ required CacheDirectiveStatsProto stats = 2;
}
-message ListPathBasedCacheDirectivesResponseProto {
- repeated ListPathBasedCacheDirectivesElementProto elements = 1;
+message ListCacheDirectivesResponseProto {
+ repeated CacheDirectiveEntryProto elements = 1;
required bool hasMore = 2;
}
@@ -632,14 +636,14 @@ service ClientNamenodeProtocol {
returns(ListCorruptFileBlocksResponseProto);
rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto);
rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
- rpc addPathBasedCacheDirective(AddPathBasedCacheDirectiveRequestProto)
- returns (AddPathBasedCacheDirectiveResponseProto);
- rpc modifyPathBasedCacheDirective(ModifyPathBasedCacheDirectiveRequestProto)
- returns (ModifyPathBasedCacheDirectiveResponseProto);
- rpc removePathBasedCacheDirective(RemovePathBasedCacheDirectiveRequestProto)
- returns (RemovePathBasedCacheDirectiveResponseProto);
- rpc listPathBasedCacheDirectives(ListPathBasedCacheDirectivesRequestProto)
- returns (ListPathBasedCacheDirectivesResponseProto);
+ rpc addCacheDirective(AddCacheDirectiveRequestProto)
+ returns (AddCacheDirectiveResponseProto);
+ rpc modifyCacheDirective(ModifyCacheDirectiveRequestProto)
+ returns (ModifyCacheDirectiveResponseProto);
+ rpc removeCacheDirective(RemoveCacheDirectiveRequestProto)
+ returns (RemoveCacheDirectiveResponseProto);
+ rpc listCacheDirectives(ListCacheDirectivesRequestProto)
+ returns (ListCacheDirectivesResponseProto);
rpc addCachePool(AddCachePoolRequestProto)
returns(AddCachePoolResponseProto);
rpc modifyCachePool(ModifyCachePoolRequestProto)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/site/apt/CentralizedCacheManagement.apt.vm Thu Nov 21 17:12:58 2013
@@ -118,7 +118,7 @@ Centralized Cache Management in HDFS
Usage: <<<hdfs cacheadmin -addDirective -path <path> -replication <replication> -pool <pool-name> >>>
- Add a new PathBasedCache directive.
+ Add a new cache directive.
*--+--+
\<path\> | A path to cache. The path can be a directory or a file.
@@ -135,7 +135,7 @@ Centralized Cache Management in HDFS
Remove a cache directive.
*--+--+
-\<id\> | The id of the cache directive to remove. You must have write permission on the pool of the directive in order to remove it. To see a list of PathBasedCache directive IDs, use the -listDirectives command.
+\<id\> | The id of the cache directive to remove. You must have write permission on the pool of the directive in order to remove it. To see a list of cachedirective IDs, use the -listDirectives command.
*--+--+
*** {removeDirectives}
@@ -152,10 +152,10 @@ Centralized Cache Management in HDFS
Usage: <<<hdfs cacheadmin -listDirectives [-path <path>] [-pool <pool>] >>>
- List PathBasedCache directives.
+ List cache directives.
*--+--+
-\<path\> | List only PathBasedCache directives with this path. Note that if there is a PathBasedCache directive for <path> in a cache pool that we don't have read access for, it will not be listed.
+\<path\> | List only cache directives with this path. Note that if there is a cache directive for <path> in a cache pool that we don't have read access for, it will not be listed.
*--+--+
\<pool\> | List only path cache directives in that pool.
*--+--+
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Thu Nov 21 17:12:58 2013
@@ -998,20 +998,20 @@ public class DFSTestUtil {
// OP_MODIFY_CACHE_POOL
filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE
- long id = filesystem.addPathBasedCacheDirective(
- new PathBasedCacheDirective.Builder().
+ long id = filesystem.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
setPath(new Path("/path")).
setReplication((short)1).
setPool("pool1").
build());
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE
- filesystem.modifyPathBasedCacheDirective(
- new PathBasedCacheDirective.Builder().
+ filesystem.modifyCacheDirective(
+ new CacheDirectiveInfo.Builder().
setId(id).
setReplication((short)2).
build());
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE
- filesystem.removePathBasedCacheDirective(id);
+ filesystem.removeCacheDirective(id);
// OP_REMOVE_CACHE_POOL
filesystem.removeCachePool("pool1");
}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java?rev=1544252&r1=1544251&r2=1544252&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/OfflineEditsViewerHelper.java Thu Nov 21 17:12:58 2013
@@ -42,7 +42,7 @@ import org.apache.hadoop.hdfs.MiniDFSClu
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
@@ -241,20 +241,20 @@ public class OfflineEditsViewerHelper {
.setMode(new FsPermission((short)0700))
.setWeight(1989));
// OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
- long id = dfs.addPathBasedCacheDirective(
- new PathBasedCacheDirective.Builder().
+ long id = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
setPath(new Path("/bar")).
setReplication((short)1).
setPool(pool).
build());
// OP_MODIFY_PATH_BASED_CACHE_DIRECTIVE 38
- dfs.modifyPathBasedCacheDirective(
- new PathBasedCacheDirective.Builder().
+ dfs.modifyCacheDirective(
+ new CacheDirectiveInfo.Builder().
setId(id).
setPath(new Path("/bar2")).
build());
// OP_REMOVE_PATH_BASED_CACHE_DIRECTIVE 34
- dfs.removePathBasedCacheDirective(id);
+ dfs.removeCacheDirective(id);
// OP_REMOVE_CACHE_POOL 37
dfs.removeCachePool(pool);
// sync to disk, otherwise we parse partial edits
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java?rev=1544252&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCacheDirectives.java Thu Nov 21 17:12:58 2013
@@ -0,0 +1,978 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.namenode;
+
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_CACHING_ENABLED_KEY;
+import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileSystemTestHelper;
+import org.apache.hadoop.fs.InvalidRequestException;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveStats;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.server.blockmanagement.CacheReplicationMonitor;
+import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.CachedBlocksList.Type;
+import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
+import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.CacheManipulator;
+import org.apache.hadoop.io.nativeio.NativeIO.POSIX.NoMlockCacheManipulator;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.hadoop.util.GSet;
+import org.apache.log4j.Level;
+import org.apache.log4j.LogManager;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.base.Supplier;
+
+public class TestCacheDirectives {
+ static final Log LOG = LogFactory.getLog(TestCacheDirectives.class);
+
+ private static final UserGroupInformation unprivilegedUser =
+ UserGroupInformation.createRemoteUser("unprivilegedUser");
+
+ static private Configuration conf;
+ static private MiniDFSCluster cluster;
+ static private DistributedFileSystem dfs;
+ static private NamenodeProtocols proto;
+ static private CacheManipulator prevCacheManipulator;
+
+ static {
+ EditLogFileOutputStream.setShouldSkipFsyncForTesting(false);
+ }
+
+ @Before
+ public void setup() throws Exception {
+ conf = new HdfsConfiguration();
+ // set low limits here for testing purposes
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_POOLS_NUM_RESPONSES, 2);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_LIST_CACHE_DIRECTIVES_NUM_RESPONSES, 2);
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
+ cluster.waitActive();
+ dfs = cluster.getFileSystem();
+ proto = cluster.getNameNodeRpc();
+ prevCacheManipulator = NativeIO.POSIX.getCacheManipulator();
+ NativeIO.POSIX.setCacheManipulator(new NoMlockCacheManipulator());
+ LogManager.getLogger(CacheReplicationMonitor.class).setLevel(Level.TRACE);
+ }
+
+ @After
+ public void teardown() throws Exception {
+ if (cluster != null) {
+ cluster.shutdown();
+ }
+ // Restore the original CacheManipulator
+ NativeIO.POSIX.setCacheManipulator(prevCacheManipulator);
+ }
+
+ @Test(timeout=60000)
+ public void testBasicPoolOperations() throws Exception {
+ final String poolName = "pool1";
+ CachePoolInfo info = new CachePoolInfo(poolName).
+ setOwnerName("bob").setGroupName("bobgroup").
+ setMode(new FsPermission((short)0755)).setWeight(150);
+
+ // Add a pool
+ dfs.addCachePool(info);
+
+ // Do some bad addCachePools
+ try {
+ dfs.addCachePool(info);
+ fail("added the pool with the same name twice");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("pool1 already exists", ioe);
+ }
+ try {
+ dfs.addCachePool(new CachePoolInfo(""));
+ fail("added empty pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+ try {
+ dfs.addCachePool(null);
+ fail("added null pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+ }
+ try {
+ proto.addCachePool(new CachePoolInfo(""));
+ fail("added empty pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+ try {
+ proto.addCachePool(null);
+ fail("added null pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+ }
+
+ // Modify the pool
+ info.setOwnerName("jane").setGroupName("janegroup")
+ .setMode(new FsPermission((short)0700)).setWeight(314);
+ dfs.modifyCachePool(info);
+
+ // Do some invalid modify pools
+ try {
+ dfs.modifyCachePool(new CachePoolInfo("fool"));
+ fail("modified non-existent cache pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("fool does not exist", ioe);
+ }
+ try {
+ dfs.modifyCachePool(new CachePoolInfo(""));
+ fail("modified empty pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+ try {
+ dfs.modifyCachePool(null);
+ fail("modified null pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+ }
+ try {
+ proto.modifyCachePool(new CachePoolInfo(""));
+ fail("modified empty pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+ try {
+ proto.modifyCachePool(null);
+ fail("modified null pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("CachePoolInfo is null", ioe);
+ }
+
+ // Remove the pool
+ dfs.removeCachePool(poolName);
+ // Do some bad removePools
+ try {
+ dfs.removeCachePool("pool99");
+ fail("expected to get an exception when " +
+ "removing a non-existent pool.");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("Cannot remove " +
+ "non-existent cache pool", ioe);
+ }
+ try {
+ dfs.removeCachePool(poolName);
+ fail("expected to get an exception when " +
+ "removing a non-existent pool.");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("Cannot remove " +
+ "non-existent cache pool", ioe);
+ }
+ try {
+ dfs.removeCachePool("");
+ fail("removed empty pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+ try {
+ dfs.removeCachePool(null);
+ fail("removed null pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+ try {
+ proto.removeCachePool("");
+ fail("removed empty pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+ try {
+ proto.removeCachePool(null);
+ fail("removed null pool");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("invalid empty cache pool name",
+ ioe);
+ }
+
+ info = new CachePoolInfo("pool2");
+ dfs.addCachePool(info);
+ }
+
+ @Test(timeout=60000)
+ public void testCreateAndModifyPools() throws Exception {
+ String poolName = "pool1";
+ String ownerName = "abc";
+ String groupName = "123";
+ FsPermission mode = new FsPermission((short)0755);
+ int weight = 150;
+ dfs.addCachePool(new CachePoolInfo(poolName).
+ setOwnerName(ownerName).setGroupName(groupName).
+ setMode(mode).setWeight(weight));
+
+ RemoteIterator<CachePoolInfo> iter = dfs.listCachePools();
+ CachePoolInfo info = iter.next();
+ assertEquals(poolName, info.getPoolName());
+ assertEquals(ownerName, info.getOwnerName());
+ assertEquals(groupName, info.getGroupName());
+
+ ownerName = "def";
+ groupName = "456";
+ mode = new FsPermission((short)0700);
+ weight = 151;
+ dfs.modifyCachePool(new CachePoolInfo(poolName).
+ setOwnerName(ownerName).setGroupName(groupName).
+ setMode(mode).setWeight(weight));
+
+ iter = dfs.listCachePools();
+ info = iter.next();
+ assertEquals(poolName, info.getPoolName());
+ assertEquals(ownerName, info.getOwnerName());
+ assertEquals(groupName, info.getGroupName());
+ assertEquals(mode, info.getMode());
+ assertEquals(Integer.valueOf(weight), info.getWeight());
+
+ dfs.removeCachePool(poolName);
+ iter = dfs.listCachePools();
+ assertFalse("expected no cache pools after deleting pool", iter.hasNext());
+
+ proto.listCachePools(null);
+
+ try {
+ proto.removeCachePool("pool99");
+ fail("expected to get an exception when " +
+ "removing a non-existent pool.");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
+ ioe);
+ }
+ try {
+ proto.removeCachePool(poolName);
+ fail("expected to get an exception when " +
+ "removing a non-existent pool.");
+ } catch (IOException ioe) {
+ GenericTestUtils.assertExceptionContains("Cannot remove non-existent",
+ ioe);
+ }
+
+ iter = dfs.listCachePools();
+ assertFalse("expected no cache pools after deleting pool", iter.hasNext());
+ }
+
+ private static void validateListAll(
+ RemoteIterator<CacheDirectiveEntry> iter,
+ Long... ids) throws Exception {
+ for (Long id: ids) {
+ assertTrue("Unexpectedly few elements", iter.hasNext());
+ assertEquals("Unexpected directive ID", id,
+ iter.next().getInfo().getId());
+ }
+ assertFalse("Unexpectedly many list elements", iter.hasNext());
+ }
+
+ private static long addAsUnprivileged(
+ final CacheDirectiveInfo directive) throws Exception {
+ return unprivilegedUser
+ .doAs(new PrivilegedExceptionAction<Long>() {
+ @Override
+ public Long run() throws IOException {
+ DistributedFileSystem myDfs =
+ (DistributedFileSystem) FileSystem.get(conf);
+ return myDfs.addCacheDirective(directive);
+ }
+ });
+ }
+
+ @Test(timeout=60000)
+ public void testAddRemoveDirectives() throws Exception {
+ proto.addCachePool(new CachePoolInfo("pool1").
+ setMode(new FsPermission((short)0777)));
+ proto.addCachePool(new CachePoolInfo("pool2").
+ setMode(new FsPermission((short)0777)));
+ proto.addCachePool(new CachePoolInfo("pool3").
+ setMode(new FsPermission((short)0777)));
+ proto.addCachePool(new CachePoolInfo("pool4").
+ setMode(new FsPermission((short)0)));
+
+ CacheDirectiveInfo alpha = new CacheDirectiveInfo.Builder().
+ setPath(new Path("/alpha")).
+ setPool("pool1").
+ build();
+ CacheDirectiveInfo beta = new CacheDirectiveInfo.Builder().
+ setPath(new Path("/beta")).
+ setPool("pool2").
+ build();
+ CacheDirectiveInfo delta = new CacheDirectiveInfo.Builder().
+ setPath(new Path("/delta")).
+ setPool("pool1").
+ build();
+
+ long alphaId = addAsUnprivileged(alpha);
+ long alphaId2 = addAsUnprivileged(alpha);
+ assertFalse("Expected to get unique directives when re-adding an "
+ + "existing CacheDirectiveInfo",
+ alphaId == alphaId2);
+ long betaId = addAsUnprivileged(beta);
+
+ try {
+ addAsUnprivileged(new CacheDirectiveInfo.Builder().
+ setPath(new Path("/unicorn")).
+ setPool("no_such_pool").
+ build());
+ fail("expected an error when adding to a non-existent pool.");
+ } catch (InvalidRequestException ioe) {
+ GenericTestUtils.assertExceptionContains("Unknown pool", ioe);
+ }
+
+ try {
+ addAsUnprivileged(new CacheDirectiveInfo.Builder().
+ setPath(new Path("/blackhole")).
+ setPool("pool4").
+ build());
+ fail("expected an error when adding to a pool with " +
+ "mode 0 (no permissions for anyone).");
+ } catch (AccessControlException e) {
+ GenericTestUtils.
+ assertExceptionContains("Permission denied while accessing pool", e);
+ }
+
+ try {
+ addAsUnprivileged(new CacheDirectiveInfo.Builder().
+ setPath(new Path("/illegal:path/")).
+ setPool("pool1").
+ build());
+ fail("expected an error when adding a malformed path " +
+ "to the cache directives.");
+ } catch (IllegalArgumentException e) {
+ GenericTestUtils.assertExceptionContains("is not a valid DFS filename", e);
+ }
+
+ try {
+ addAsUnprivileged(new CacheDirectiveInfo.Builder().
+ setPath(new Path("/emptypoolname")).
+ setReplication((short)1).
+ setPool("").
+ build());
+ fail("expected an error when adding a cache " +
+ "directive with an empty pool name.");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("Invalid empty pool name", e);
+ }
+
+ long deltaId = addAsUnprivileged(delta);
+
+ // We expect the following to succeed, because DistributedFileSystem
+ // qualifies the path.
+ long relativeId = addAsUnprivileged(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("relative")).
+ setPool("pool1").
+ build());
+
+ RemoteIterator<CacheDirectiveEntry> iter;
+ iter = dfs.listCacheDirectives(null);
+ validateListAll(iter, alphaId, alphaId2, betaId, deltaId, relativeId );
+ iter = dfs.listCacheDirectives(
+ new CacheDirectiveInfo.Builder().setPool("pool3").build());
+ assertFalse(iter.hasNext());
+ iter = dfs.listCacheDirectives(
+ new CacheDirectiveInfo.Builder().setPool("pool1").build());
+ validateListAll(iter, alphaId, alphaId2, deltaId, relativeId );
+ iter = dfs.listCacheDirectives(
+ new CacheDirectiveInfo.Builder().setPool("pool2").build());
+ validateListAll(iter, betaId);
+
+ dfs.removeCacheDirective(betaId);
+ iter = dfs.listCacheDirectives(
+ new CacheDirectiveInfo.Builder().setPool("pool2").build());
+ assertFalse(iter.hasNext());
+
+ try {
+ dfs.removeCacheDirective(betaId);
+ fail("expected an error when removing a non-existent ID");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("No directive with ID", e);
+ }
+
+ try {
+ proto.removeCacheDirective(-42l);
+ fail("expected an error when removing a negative ID");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains(
+ "Invalid negative ID", e);
+ }
+ try {
+ proto.removeCacheDirective(43l);
+ fail("expected an error when removing a non-existent ID");
+ } catch (InvalidRequestException e) {
+ GenericTestUtils.assertExceptionContains("No directive with ID", e);
+ }
+
+ dfs.removeCacheDirective(alphaId);
+ dfs.removeCacheDirective(alphaId2);
+ dfs.removeCacheDirective(deltaId);
+
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder().
+ setId(relativeId).
+ setReplication((short)555).
+ build());
+ iter = dfs.listCacheDirectives(null);
+ assertTrue(iter.hasNext());
+ CacheDirectiveInfo modified = iter.next().getInfo();
+ assertEquals(relativeId, modified.getId().longValue());
+ assertEquals((short)555, modified.getReplication().shortValue());
+ dfs.removeCacheDirective(relativeId);
+ iter = dfs.listCacheDirectives(null);
+ assertFalse(iter.hasNext());
+
+ // Verify that PBCDs with path "." work correctly
+ CacheDirectiveInfo directive =
+ new CacheDirectiveInfo.Builder().setPath(new Path("."))
+ .setPool("pool1").build();
+ long id = dfs.addCacheDirective(directive);
+ dfs.modifyCacheDirective(new CacheDirectiveInfo.Builder(
+ directive).setId(id).setReplication((short)2).build());
+ dfs.removeCacheDirective(id);
+ }
+
+ @Test(timeout=60000)
+ public void testCacheManagerRestart() throws Exception {
+ cluster.shutdown();
+ cluster = null;
+ HdfsConfiguration conf = createCachingConf();
+ cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0).build();
+
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+
+ // Create and validate a pool
+ final String pool = "poolparty";
+ String groupName = "partygroup";
+ FsPermission mode = new FsPermission((short)0777);
+ int weight = 747;
+ dfs.addCachePool(new CachePoolInfo(pool)
+ .setGroupName(groupName)
+ .setMode(mode)
+ .setWeight(weight));
+ RemoteIterator<CachePoolInfo> pit = dfs.listCachePools();
+ assertTrue("No cache pools found", pit.hasNext());
+ CachePoolInfo info = pit.next();
+ assertEquals(pool, info.getPoolName());
+ assertEquals(groupName, info.getGroupName());
+ assertEquals(mode, info.getMode());
+ assertEquals(weight, (int)info.getWeight());
+ assertFalse("Unexpected # of cache pools found", pit.hasNext());
+
+ // Create some cache entries
+ int numEntries = 10;
+ String entryPrefix = "/party-";
+ long prevId = -1;
+ for (int i=0; i<numEntries; i++) {
+ prevId = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path(entryPrefix + i)).setPool(pool).build());
+ }
+ RemoteIterator<CacheDirectiveEntry> dit
+ = dfs.listCacheDirectives(null);
+ for (int i=0; i<numEntries; i++) {
+ assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+ CacheDirectiveInfo cd = dit.next().getInfo();
+ assertEquals(i+1, cd.getId().longValue());
+ assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+ assertEquals(pool, cd.getPool());
+ }
+ assertFalse("Unexpected # of cache directives found", dit.hasNext());
+
+ // Restart namenode
+ cluster.restartNameNode();
+
+ // Check that state came back up
+ pit = dfs.listCachePools();
+ assertTrue("No cache pools found", pit.hasNext());
+ info = pit.next();
+ assertEquals(pool, info.getPoolName());
+ assertEquals(pool, info.getPoolName());
+ assertEquals(groupName, info.getGroupName());
+ assertEquals(mode, info.getMode());
+ assertEquals(weight, (int)info.getWeight());
+ assertFalse("Unexpected # of cache pools found", pit.hasNext());
+
+ dit = dfs.listCacheDirectives(null);
+ for (int i=0; i<numEntries; i++) {
+ assertTrue("Unexpected # of cache entries: " + i, dit.hasNext());
+ CacheDirectiveInfo cd = dit.next().getInfo();
+ assertEquals(i+1, cd.getId().longValue());
+ assertEquals(entryPrefix + i, cd.getPath().toUri().getPath());
+ assertEquals(pool, cd.getPool());
+ }
+ assertFalse("Unexpected # of cache directives found", dit.hasNext());
+
+ long nextId = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foobar")).setPool(pool).build());
+ assertEquals(prevId + 1, nextId);
+ }
+
+ /**
+ * Wait for the NameNode to have an expected number of cached blocks
+ * and replicas.
+ * @param nn NameNode
+ * @param expectedCachedBlocks
+ * @param expectedCachedReplicas
+ * @throws Exception
+ */
+ private static void waitForCachedBlocks(NameNode nn,
+ final int expectedCachedBlocks, final int expectedCachedReplicas,
+ final String logString) throws Exception {
+ final FSNamesystem namesystem = nn.getNamesystem();
+ final CacheManager cacheManager = namesystem.getCacheManager();
+ LOG.info("Waiting for " + expectedCachedBlocks + " blocks with " +
+ expectedCachedReplicas + " replicas.");
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ int numCachedBlocks = 0, numCachedReplicas = 0;
+ namesystem.readLock();
+ try {
+ GSet<CachedBlock, CachedBlock> cachedBlocks =
+ cacheManager.getCachedBlocks();
+ if (cachedBlocks != null) {
+ for (Iterator<CachedBlock> iter = cachedBlocks.iterator();
+ iter.hasNext(); ) {
+ CachedBlock cachedBlock = iter.next();
+ numCachedBlocks++;
+ numCachedReplicas += cachedBlock.getDatanodes(Type.CACHED).size();
+ }
+ }
+ } finally {
+ namesystem.readUnlock();
+ }
+ if ((numCachedBlocks == expectedCachedBlocks) &&
+ (numCachedReplicas == expectedCachedReplicas)) {
+ return true;
+ } else {
+ LOG.info(logString + " cached blocks: have " + numCachedBlocks +
+ " / " + expectedCachedBlocks + ". " +
+ "cached replicas: have " + numCachedReplicas +
+ " / " + expectedCachedReplicas);
+ return false;
+ }
+ }
+ }, 500, 60000);
+ }
+
+ private static void checkNumCachedReplicas(final DistributedFileSystem dfs,
+ final List<Path> paths, final int expectedBlocks,
+ final int expectedReplicas)
+ throws Exception {
+ int numCachedBlocks = 0;
+ int numCachedReplicas = 0;
+ for (Path p: paths) {
+ final FileStatus f = dfs.getFileStatus(p);
+ final long len = f.getLen();
+ final long blockSize = f.getBlockSize();
+ // round it up to full blocks
+ final long numBlocks = (len + blockSize - 1) / blockSize;
+ BlockLocation[] locs = dfs.getFileBlockLocations(p, 0, len);
+ assertEquals("Unexpected number of block locations for path " + p,
+ numBlocks, locs.length);
+ for (BlockLocation l: locs) {
+ if (l.getCachedHosts().length > 0) {
+ numCachedBlocks++;
+ }
+ numCachedReplicas += l.getCachedHosts().length;
+ }
+ }
+ LOG.info("Found " + numCachedBlocks + " of " + expectedBlocks + " blocks");
+ LOG.info("Found " + numCachedReplicas + " of " + expectedReplicas
+ + " replicas");
+ assertEquals("Unexpected number of cached blocks", expectedBlocks,
+ numCachedBlocks);
+ assertEquals("Unexpected number of cached replicas", expectedReplicas,
+ numCachedReplicas);
+ }
+
+ private static final long BLOCK_SIZE = 512;
+ private static final int NUM_DATANODES = 4;
+
+ // Most Linux installs will allow non-root users to lock 64KB.
+ private static final long CACHE_CAPACITY = 64 * 1024 / NUM_DATANODES;
+
+ private static HdfsConfiguration createCachingConf() {
+ HdfsConfiguration conf = new HdfsConfiguration();
+ conf.setLong(DFS_BLOCK_SIZE_KEY, BLOCK_SIZE);
+ conf.setLong(DFS_DATANODE_MAX_LOCKED_MEMORY_KEY, CACHE_CAPACITY);
+ conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, true);
+ conf.setLong(DFS_CACHEREPORT_INTERVAL_MSEC_KEY, 1000);
+ conf.setLong(DFS_NAMENODE_PATH_BASED_CACHE_REFRESH_INTERVAL_MS, 1000);
+ return conf;
+ }
+
+ @Test(timeout=120000)
+ public void testWaitForCachedReplicas() throws Exception {
+ HdfsConfiguration conf = createCachingConf();
+ FileSystemTestHelper helper = new FileSystemTestHelper();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ NamenodeProtocols nnRpc = namenode.getRpcServer();
+ Path rootDir = helper.getDefaultWorkingDirectory(dfs);
+ // Create the pool
+ final String pool = "friendlyPool";
+ nnRpc.addCachePool(new CachePoolInfo("friendlyPool"));
+ // Create some test files
+ final int numFiles = 2;
+ final int numBlocksPerFile = 2;
+ final List<String> paths = new ArrayList<String>(numFiles);
+ for (int i=0; i<numFiles; i++) {
+ Path p = new Path(rootDir, "testCachePaths-" + i);
+ FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+ (int)BLOCK_SIZE);
+ paths.add(p.toUri().getPath());
+ }
+ // Check the initial statistics at the namenode
+ waitForCachedBlocks(namenode, 0, 0, "testWaitForCachedReplicas:0");
+ // Cache and check each path in sequence
+ int expected = 0;
+ for (int i=0; i<numFiles; i++) {
+ CacheDirectiveInfo directive =
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path(paths.get(i))).
+ setPool(pool).
+ build();
+ nnRpc.addCacheDirective(directive);
+ expected += numBlocksPerFile;
+ waitForCachedBlocks(namenode, expected, expected,
+ "testWaitForCachedReplicas:1");
+ }
+ // Uncache and check each path in sequence
+ RemoteIterator<CacheDirectiveEntry> entries =
+ nnRpc.listCacheDirectives(0, null);
+ for (int i=0; i<numFiles; i++) {
+ CacheDirectiveEntry entry = entries.next();
+ nnRpc.removeCacheDirective(entry.getInfo().getId());
+ expected -= numBlocksPerFile;
+ waitForCachedBlocks(namenode, expected, expected,
+ "testWaitForCachedReplicas:2");
+ }
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testAddingCacheDirectiveInfosWhenCachingIsDisabled()
+ throws Exception {
+ HdfsConfiguration conf = createCachingConf();
+ conf.setBoolean(DFS_NAMENODE_CACHING_ENABLED_KEY, false);
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ // Create the pool
+ String pool = "pool1";
+ namenode.getRpcServer().addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final int numFiles = 2;
+ final int numBlocksPerFile = 2;
+ final List<String> paths = new ArrayList<String>(numFiles);
+ for (int i=0; i<numFiles; i++) {
+ Path p = new Path("/testCachePaths-" + i);
+ FileSystemTestHelper.createFile(dfs, p, numBlocksPerFile,
+ (int)BLOCK_SIZE);
+ paths.add(p.toUri().getPath());
+ }
+ // Check the initial statistics at the namenode
+ waitForCachedBlocks(namenode, 0, 0,
+ "testAddingCacheDirectiveInfosWhenCachingIsDisabled:0");
+ // Cache and check each path in sequence
+ int expected = 0;
+ for (int i=0; i<numFiles; i++) {
+ CacheDirectiveInfo directive =
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path(paths.get(i))).
+ setPool(pool).
+ build();
+ dfs.addCacheDirective(directive);
+ waitForCachedBlocks(namenode, expected, 0,
+ "testAddingCacheDirectiveInfosWhenCachingIsDisabled:1");
+ }
+ Thread.sleep(20000);
+ waitForCachedBlocks(namenode, expected, 0,
+ "testAddingCacheDirectiveInfosWhenCachingIsDisabled:2");
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout=120000)
+ public void testWaitForCachedReplicasInDirectory() throws Exception {
+ HdfsConfiguration conf = createCachingConf();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ // Create the pool
+ final String pool = "friendlyPool";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final List<Path> paths = new LinkedList<Path>();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
+ }
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:0");
+ // cache entire directory
+ long id = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ setReplication((short)2).
+ setPool(pool).
+ build());
+ waitForCachedBlocks(namenode, 4, 8,
+ "testWaitForCachedReplicasInDirectory:1");
+ // Verify that listDirectives gives the stats we want.
+ RemoteIterator<CacheDirectiveEntry> iter =
+ dfs.listCacheDirectives(new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ build());
+ CacheDirectiveEntry entry = iter.next();
+ CacheDirectiveStats stats = entry.getStats();
+ Assert.assertEquals(Long.valueOf(2),
+ stats.getFilesAffected());
+ Assert.assertEquals(Long.valueOf(
+ 2 * numBlocksPerFile * BLOCK_SIZE * 2),
+ stats.getBytesNeeded());
+ Assert.assertEquals(Long.valueOf(
+ 2 * numBlocksPerFile * BLOCK_SIZE * 2),
+ stats.getBytesCached());
+
+ long id2 = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo/bar")).
+ setReplication((short)4).
+ setPool(pool).
+ build());
+ // wait for an additional 2 cached replicas to come up
+ waitForCachedBlocks(namenode, 4, 10,
+ "testWaitForCachedReplicasInDirectory:2");
+ // the directory directive's stats are unchanged
+ iter = dfs.listCacheDirectives(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ build());
+ entry = iter.next();
+ stats = entry.getStats();
+ Assert.assertEquals(Long.valueOf(2),
+ stats.getFilesAffected());
+ Assert.assertEquals(Long.valueOf(
+ 2 * numBlocksPerFile * BLOCK_SIZE * 2),
+ stats.getBytesNeeded());
+ Assert.assertEquals(Long.valueOf(
+ 2 * numBlocksPerFile * BLOCK_SIZE * 2),
+ stats.getBytesCached());
+ // verify /foo/bar's stats
+ iter = dfs.listCacheDirectives(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo/bar")).
+ build());
+ entry = iter.next();
+ stats = entry.getStats();
+ Assert.assertEquals(Long.valueOf(1),
+ stats.getFilesAffected());
+ Assert.assertEquals(Long.valueOf(
+ 4 * numBlocksPerFile * BLOCK_SIZE),
+ stats.getBytesNeeded());
+ // only 3 because the file only has 3 replicas, not 4 as requested.
+ Assert.assertEquals(Long.valueOf(
+ 3 * numBlocksPerFile * BLOCK_SIZE),
+ stats.getBytesCached());
+
+ // remove and watch numCached go to 0
+ dfs.removeCacheDirective(id);
+ dfs.removeCacheDirective(id2);
+ waitForCachedBlocks(namenode, 0, 0,
+ "testWaitForCachedReplicasInDirectory:3");
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ /**
+ * Tests stepping the cache replication factor up and down, checking the
+ * number of cached replicas and blocks as well as the advertised locations.
+ * @throws Exception
+ */
+ @Test(timeout=120000)
+ public void testReplicationFactor() throws Exception {
+ HdfsConfiguration conf = createCachingConf();
+ MiniDFSCluster cluster =
+ new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
+
+ try {
+ cluster.waitActive();
+ DistributedFileSystem dfs = cluster.getFileSystem();
+ NameNode namenode = cluster.getNameNode();
+ // Create the pool
+ final String pool = "friendlyPool";
+ dfs.addCachePool(new CachePoolInfo(pool));
+ // Create some test files
+ final List<Path> paths = new LinkedList<Path>();
+ paths.add(new Path("/foo/bar"));
+ paths.add(new Path("/foo/baz"));
+ paths.add(new Path("/foo2/bar2"));
+ paths.add(new Path("/foo2/baz2"));
+ dfs.mkdir(new Path("/foo"), FsPermission.getDirDefault());
+ dfs.mkdir(new Path("/foo2"), FsPermission.getDirDefault());
+ final int numBlocksPerFile = 2;
+ for (Path path : paths) {
+ FileSystemTestHelper.createFile(dfs, path, numBlocksPerFile,
+ (int)BLOCK_SIZE, (short)3, false);
+ }
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:0");
+ checkNumCachedReplicas(dfs, paths, 0, 0);
+ // cache directory
+ long id = dfs.addCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setPath(new Path("/foo")).
+ setReplication((short)1).
+ setPool(pool).
+ build());
+ waitForCachedBlocks(namenode, 4, 4, "testReplicationFactor:1");
+ checkNumCachedReplicas(dfs, paths, 4, 4);
+ // step up the replication factor
+ for (int i=2; i<=3; i++) {
+ dfs.modifyCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:2");
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
+ }
+ // step it down
+ for (int i=2; i>=1; i--) {
+ dfs.modifyCacheDirective(
+ new CacheDirectiveInfo.Builder().
+ setId(id).
+ setReplication((short)i).
+ build());
+ waitForCachedBlocks(namenode, 4, 4*i, "testReplicationFactor:3");
+ checkNumCachedReplicas(dfs, paths, 4, 4*i);
+ }
+ // remove and watch numCached go to 0
+ dfs.removeCacheDirective(id);
+ waitForCachedBlocks(namenode, 0, 0, "testReplicationFactor:4");
+ checkNumCachedReplicas(dfs, paths, 0, 0);
+ } finally {
+ cluster.shutdown();
+ }
+ }
+
+ @Test(timeout=60000)
+ public void testListCachePoolPermissions() throws Exception {
+ final UserGroupInformation myUser = UserGroupInformation
+ .createRemoteUser("myuser");
+ final DistributedFileSystem myDfs =
+ (DistributedFileSystem)DFSTestUtil.getFileSystemAs(myUser, conf);
+ final String poolName = "poolparty";
+ dfs.addCachePool(new CachePoolInfo(poolName)
+ .setMode(new FsPermission((short)0700)));
+ // Should only see partial info
+ RemoteIterator<CachePoolInfo> it = myDfs.listCachePools();
+ CachePoolInfo info = it.next();
+ assertFalse(it.hasNext());
+ assertEquals("Expected pool name", poolName, info.getPoolName());
+ assertNull("Unexpected owner name", info.getOwnerName());
+ assertNull("Unexpected group name", info.getGroupName());
+ assertNull("Unexpected mode", info.getMode());
+ assertNull("Unexpected weight", info.getWeight());
+ // Modify the pool so myuser is now the owner
+ dfs.modifyCachePool(new CachePoolInfo(poolName)
+ .setOwnerName(myUser.getShortUserName())
+ .setWeight(99));
+ // Should see full info
+ it = myDfs.listCachePools();
+ info = it.next();
+ assertFalse(it.hasNext());
+ assertEquals("Expected pool name", poolName, info.getPoolName());
+ assertEquals("Mismatched owner name", myUser.getShortUserName(),
+ info.getOwnerName());
+ assertNotNull("Expected group name", info.getGroupName());
+ assertEquals("Mismatched mode", (short) 0700,
+ info.getMode().toShort());
+ assertEquals("Mismatched weight", 99, (int)info.getWeight());
+ }
+}