You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/29 22:04:35 UTC
svn commit: r1536889 [3/4] - in
/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs: ./
dev-support/ src/main/bin/ src/main/java/
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/client/ src/main/java/org/apache/...
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java Tue Oct 29 21:04:31 2013
@@ -18,6 +18,8 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ADD_PATH_BASED_CACHE_DIRECTIVE;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOCATE_BLOCK_ID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_ALLOW_SNAPSHOT;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_CANCEL_DELEGATION_TOKEN;
@@ -32,7 +34,10 @@ import static org.apache.hadoop.hdfs.ser
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_GET_DELEGATION_TOKEN;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_INVALID;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MKDIR;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_MODIFY_CACHE_POOL;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REASSIGN_LEASE;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_CACHE_POOL;
+import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_OLD;
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOpCodes.OP_RENAME_SNAPSHOT;
@@ -56,6 +61,7 @@ import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.EnumMap;
import java.util.List;
@@ -73,6 +79,7 @@ import org.apache.hadoop.fs.permission.P
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
@@ -97,7 +104,9 @@ import org.xml.sax.ContentHandler;
import org.xml.sax.SAXException;
import org.xml.sax.helpers.AttributesImpl;
+import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Strings;
/**
* Helper classes for reading the ops from an InputStream.
@@ -153,6 +162,13 @@ public abstract class FSEditLogOp {
inst.put(OP_RENAME_SNAPSHOT, new RenameSnapshotOp());
inst.put(OP_SET_GENSTAMP_V2, new SetGenstampV2Op());
inst.put(OP_ALLOCATE_BLOCK_ID, new AllocateBlockIdOp());
+ inst.put(OP_ADD_PATH_BASED_CACHE_DIRECTIVE,
+ new AddPathBasedCacheDirectiveOp());
+ inst.put(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR,
+ new RemovePathBasedCacheDescriptorOp());
+ inst.put(OP_ADD_CACHE_POOL, new AddCachePoolOp());
+ inst.put(OP_MODIFY_CACHE_POOL, new ModifyCachePoolOp());
+ inst.put(OP_REMOVE_CACHE_POOL, new RemoveCachePoolOp());
}
public FSEditLogOp get(FSEditLogOpCodes opcode) {
@@ -528,8 +544,7 @@ public abstract class FSEditLogOp {
} else {
this.blocks = new Block[0];
}
- this.permissions =
- permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+ this.permissions = permissionStatusFromXml(st);
readRpcIdsFromXml(st);
}
}
@@ -1208,8 +1223,7 @@ public abstract class FSEditLogOp {
this.inodeId = Long.valueOf(st.getValue("INODEID"));
this.path = st.getValue("PATH");
this.timestamp = Long.valueOf(st.getValue("TIMESTAMP"));
- this.permissions =
- permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+ this.permissions = permissionStatusFromXml(st);
}
}
@@ -1940,8 +1954,7 @@ public abstract class FSEditLogOp {
this.value = st.getValue("VALUE");
this.mtime = Long.valueOf(st.getValue("MTIME"));
this.atime = Long.valueOf(st.getValue("ATIME"));
- this.permissionStatus =
- permissionStatusFromXml(st.getChildren("PERMISSION_STATUS").get(0));
+ this.permissionStatus = permissionStatusFromXml(st);
readRpcIdsFromXml(st);
}
@@ -2848,6 +2861,317 @@ public abstract class FSEditLogOp {
}
}
+ /**
+ * {@literal @AtMostOnce} for
+ * {@link ClientProtocol#addPathBasedCacheDirective}
+ */
+ static class AddPathBasedCacheDirectiveOp extends FSEditLogOp {
+ String path;
+ short replication;
+ String pool;
+
+ public AddPathBasedCacheDirectiveOp() {
+ super(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+ }
+
+ static AddPathBasedCacheDirectiveOp getInstance(OpInstanceCache cache) {
+ return (AddPathBasedCacheDirectiveOp) cache
+ .get(OP_ADD_PATH_BASED_CACHE_DIRECTIVE);
+ }
+
+ public AddPathBasedCacheDirectiveOp setPath(String path) {
+ this.path = path;
+ return this;
+ }
+
+ public AddPathBasedCacheDirectiveOp setReplication(short replication) {
+ this.replication = replication;
+ return this;
+ }
+
+ public AddPathBasedCacheDirectiveOp setPool(String pool) {
+ this.pool = pool;
+ return this;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ this.path = FSImageSerialization.readString(in);
+ this.replication = FSImageSerialization.readShort(in);
+ this.pool = FSImageSerialization.readString(in);
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ FSImageSerialization.writeString(path, out);
+ FSImageSerialization.writeShort(replication, out);
+ FSImageSerialization.writeString(pool, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "PATH", path);
+ XMLUtils.addSaxString(contentHandler, "REPLICATION",
+ Short.toString(replication));
+ XMLUtils.addSaxString(contentHandler, "POOL", pool);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ path = st.getValue("PATH");
+ replication = Short.parseShort(st.getValue("REPLICATION"));
+ pool = st.getValue("POOL");
+ readRpcIdsFromXml(st);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("AddPathBasedCacheDirective [");
+ builder.append("path=" + path + ",");
+ builder.append("replication=" + replication + ",");
+ builder.append("pool=" + pool);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+
+ /**
+ * {@literal @AtMostOnce} for
+ * {@link ClientProtocol#removePathBasedCacheDescriptor}
+ */
+ static class RemovePathBasedCacheDescriptorOp extends FSEditLogOp {
+ long id;
+
+ public RemovePathBasedCacheDescriptorOp() {
+ super(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
+ }
+
+ static RemovePathBasedCacheDescriptorOp getInstance(OpInstanceCache cache) {
+ return (RemovePathBasedCacheDescriptorOp) cache
+ .get(OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR);
+ }
+
+ public RemovePathBasedCacheDescriptorOp setId(long id) {
+ this.id = id;
+ return this;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ this.id = FSImageSerialization.readLong(in);
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ FSImageSerialization.writeLong(id, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "ID", Long.toString(id));
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ this.id = Long.parseLong(st.getValue("ID"));
+ readRpcIdsFromXml(st);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("RemovePathBasedCacheDescriptor [");
+ builder.append("id=" + Long.toString(id));
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#addCachePool} */
+ static class AddCachePoolOp extends FSEditLogOp {
+ CachePoolInfo info;
+
+ public AddCachePoolOp() {
+ super(OP_ADD_CACHE_POOL);
+ }
+
+ static AddCachePoolOp getInstance(OpInstanceCache cache) {
+ return (AddCachePoolOp) cache.get(OP_ADD_CACHE_POOL);
+ }
+
+ public AddCachePoolOp setPool(CachePoolInfo info) {
+ this.info = info;
+ return this;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ info = CachePoolInfo.readFrom(in);
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ info .writeTo(out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ info.writeXmlTo(contentHandler);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ this.info = CachePoolInfo.readXmlFrom(st);
+ readRpcIdsFromXml(st);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("AddCachePoolOp [");
+ builder.append("poolName=" + info.getPoolName() + ",");
+ builder.append("ownerName=" + info.getOwnerName() + ",");
+ builder.append("groupName=" + info.getGroupName() + ",");
+ builder.append("mode=" + Short.toString(info.getMode().toShort()) + ",");
+ builder.append("weight=" + Integer.toString(info.getWeight()));
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#modifyCachePool} */
+ static class ModifyCachePoolOp extends FSEditLogOp {
+ CachePoolInfo info;
+
+ public ModifyCachePoolOp() {
+ super(OP_MODIFY_CACHE_POOL);
+ }
+
+ static ModifyCachePoolOp getInstance(OpInstanceCache cache) {
+ return (ModifyCachePoolOp) cache.get(OP_MODIFY_CACHE_POOL);
+ }
+
+ public ModifyCachePoolOp setInfo(CachePoolInfo info) {
+ this.info = info;
+ return this;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ info = CachePoolInfo.readFrom(in);
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ info.writeTo(out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ cachePoolInfoToXml(contentHandler, info);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ this.info = cachePoolInfoFromXml(st);
+ readRpcIdsFromXml(st);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("ModifyCachePoolOp [");
+ ArrayList<String> fields = new ArrayList<String>(5);
+ if (info.getPoolName() != null) {
+ fields.add("poolName=" + info.getPoolName());
+ }
+ if (info.getOwnerName() != null) {
+ fields.add("ownerName=" + info.getOwnerName());
+ }
+ if (info.getGroupName() != null) {
+ fields.add("groupName=" + info.getGroupName());
+ }
+ if (info.getMode() != null) {
+ fields.add("mode=" + info.getMode().toString());
+ }
+ if (info.getWeight() != null) {
+ fields.add("weight=" + info.getWeight());
+ }
+ builder.append(Joiner.on(",").join(fields));
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+
+ /** {@literal @AtMostOnce} for {@link ClientProtocol#removeCachePool} */
+ static class RemoveCachePoolOp extends FSEditLogOp {
+ String poolName;
+
+ public RemoveCachePoolOp() {
+ super(OP_REMOVE_CACHE_POOL);
+ }
+
+ static RemoveCachePoolOp getInstance(OpInstanceCache cache) {
+ return (RemoveCachePoolOp) cache.get(OP_REMOVE_CACHE_POOL);
+ }
+
+ public RemoveCachePoolOp setPoolName(String poolName) {
+ this.poolName = poolName;
+ return this;
+ }
+
+ @Override
+ void readFields(DataInputStream in, int logVersion) throws IOException {
+ poolName = FSImageSerialization.readString(in);
+ readRpcIds(in, logVersion);
+ }
+
+ @Override
+ public void writeFields(DataOutputStream out) throws IOException {
+ FSImageSerialization.writeString(poolName, out);
+ writeRpcIds(rpcClientId, rpcCallId, out);
+ }
+
+ @Override
+ protected void toXml(ContentHandler contentHandler) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "POOLNAME", poolName);
+ appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId);
+ }
+
+ @Override
+ void fromXml(Stanza st) throws InvalidXmlException {
+ this.poolName = st.getValue("POOLNAME");
+ readRpcIdsFromXml(st);
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder builder = new StringBuilder();
+ builder.append("RemoveCachePoolOp [");
+ builder.append("poolName=" + poolName);
+ appendRpcIdsToString(builder, rpcClientId, rpcCallId);
+ builder.append("]");
+ return builder.toString();
+ }
+ }
+
static private short readShort(DataInputStream in) throws IOException {
return Short.parseShort(FSImageSerialization.readString(in));
}
@@ -3235,16 +3559,65 @@ public abstract class FSEditLogOp {
contentHandler.startElement("", "", "PERMISSION_STATUS", new AttributesImpl());
XMLUtils.addSaxString(contentHandler, "USERNAME", perm.getUserName());
XMLUtils.addSaxString(contentHandler, "GROUPNAME", perm.getGroupName());
- XMLUtils.addSaxString(contentHandler, "MODE",
- Short.valueOf(perm.getPermission().toShort()).toString());
+ fsPermissionToXml(contentHandler, perm.getPermission());
contentHandler.endElement("", "", "PERMISSION_STATUS");
}
public static PermissionStatus permissionStatusFromXml(Stanza st)
throws InvalidXmlException {
- String username = st.getValue("USERNAME");
- String groupname = st.getValue("GROUPNAME");
+ Stanza status = st.getChildren("PERMISSION_STATUS").get(0);
+ String username = status.getValue("USERNAME");
+ String groupname = status.getValue("GROUPNAME");
+ FsPermission mode = fsPermissionFromXml(status);
+ return new PermissionStatus(username, groupname, mode);
+ }
+
+ public static void fsPermissionToXml(ContentHandler contentHandler,
+ FsPermission mode) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "MODE", Short.valueOf(mode.toShort())
+ .toString());
+ }
+
+ public static FsPermission fsPermissionFromXml(Stanza st)
+ throws InvalidXmlException {
short mode = Short.valueOf(st.getValue("MODE"));
- return new PermissionStatus(username, groupname, new FsPermission(mode));
+ return new FsPermission(mode);
+ }
+
+ public static void cachePoolInfoToXml(ContentHandler contentHandler,
+ CachePoolInfo info) throws SAXException {
+ XMLUtils.addSaxString(contentHandler, "POOLNAME", info.getPoolName());
+ if (info.getOwnerName() != null) {
+ XMLUtils.addSaxString(contentHandler, "OWNERNAME", info.getOwnerName());
+ }
+ if (info.getGroupName() != null) {
+ XMLUtils.addSaxString(contentHandler, "GROUPNAME", info.getGroupName());
+ }
+ if (info.getMode() != null) {
+ fsPermissionToXml(contentHandler, info.getMode());
+ }
+ if (info.getWeight() != null) {
+ XMLUtils.addSaxString(contentHandler, "WEIGHT",
+ Integer.toString(info.getWeight()));
+ }
+ }
+
+ public static CachePoolInfo cachePoolInfoFromXml(Stanza st)
+ throws InvalidXmlException {
+ String poolName = st.getValue("POOLNAME");
+ CachePoolInfo info = new CachePoolInfo(poolName);
+ if (st.hasChildren("OWNERNAME")) {
+ info.setOwnerName(st.getValue("OWNERNAME"));
+ }
+ if (st.hasChildren("GROUPNAME")) {
+ info.setGroupName(st.getValue("GROUPNAME"));
+ }
+ if (st.hasChildren("MODE")) {
+ info.setMode(FSEditLogOp.fsPermissionFromXml(st));
+ }
+ if (st.hasChildren("WEIGHT")) {
+ info.setWeight(Integer.parseInt(st.getValue("WEIGHT")));
+ }
+ return info;
}
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOpCodes.java Tue Oct 29 21:04:31 2013
@@ -63,7 +63,13 @@ public enum FSEditLogOpCodes {
OP_ALLOW_SNAPSHOT ((byte) 29),
OP_DISALLOW_SNAPSHOT ((byte) 30),
OP_SET_GENSTAMP_V2 ((byte) 31),
- OP_ALLOCATE_BLOCK_ID ((byte) 32);
+ OP_ALLOCATE_BLOCK_ID ((byte) 32),
+ OP_ADD_PATH_BASED_CACHE_DIRECTIVE ((byte) 33),
+ OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR ((byte) 34),
+ OP_ADD_CACHE_POOL ((byte) 35),
+ OP_MODIFY_CACHE_POOL ((byte) 36),
+ OP_REMOVE_CACHE_POOL ((byte) 37);
+
private byte opCode;
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormat.java Tue Oct 29 21:04:31 2013
@@ -351,6 +351,8 @@ public class FSImageFormat {
loadSecretManagerState(in);
+ loadCacheManagerState(in);
+
// make sure to read to the end of file
boolean eof = (in.read() == -1);
assert eof : "Should have reached the end of image file " + curFile;
@@ -843,6 +845,14 @@ public class FSImageFormat {
namesystem.loadSecretManagerState(in);
}
+ private void loadCacheManagerState(DataInput in) throws IOException {
+ int imgVersion = getLayoutVersion();
+ if (!LayoutVersion.supports(Feature.CACHING, imgVersion)) {
+ return;
+ }
+ namesystem.getCacheManager().loadState(in);
+ }
+
private int getLayoutVersion() {
return namesystem.getFSImage().getStorage().getLayoutVersion();
}
@@ -985,6 +995,8 @@ public class FSImageFormat {
context.checkCancelled();
sourceNamesystem.saveSecretManagerState(out, sdPath);
context.checkCancelled();
+ sourceNamesystem.getCacheManager().saveState(out, sdPath);
+ context.checkCancelled();
out.flush();
context.checkCancelled();
fout.getChannel().force(true);
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Oct 29 21:04:31 2013
@@ -121,6 +121,7 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator.BatchedListEntries;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DirectoryListingStartAfterNotFoundException;
@@ -145,6 +146,8 @@ import org.apache.hadoop.hdfs.HdfsConfig
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -156,6 +159,7 @@ import org.apache.hadoop.hdfs.protocol.H
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.QuotaExceededException;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
@@ -374,6 +378,7 @@ public class FSNamesystem implements Nam
FSDirectory dir;
private final BlockManager blockManager;
private final SnapshotManager snapshotManager;
+ private final CacheManager cacheManager;
private final DatanodeStatistics datanodeStatistics;
// Block pool ID used by this namenode
@@ -709,6 +714,12 @@ public class FSNamesystem implements Nam
this.dtSecretManager = createDelegationTokenSecretManager(conf);
this.dir = new FSDirectory(fsImage, this, conf);
this.snapshotManager = new SnapshotManager(dir);
+ writeLock();
+ try {
+ this.cacheManager = new CacheManager(this, conf, blockManager);
+ } finally {
+ writeUnlock();
+ }
this.safeMode = new SafeModeInfo(conf);
this.auditLoggers = initAuditLoggers(conf);
this.isDefaultAuditLogger = auditLoggers.size() == 1 &&
@@ -901,6 +912,7 @@ public class FSNamesystem implements Nam
writeLock();
try {
if (blockManager != null) blockManager.close();
+ cacheManager.deactivate();
} finally {
writeUnlock();
}
@@ -932,7 +944,7 @@ public class FSNamesystem implements Nam
blockManager.getDatanodeManager().markAllDatanodesStale();
blockManager.clearQueues();
blockManager.processAllPendingDNMessages();
-
+
if (!isInSafeMode() ||
(isInSafeMode() && safeMode.isPopulatingReplQueues())) {
LOG.info("Reprocessing replication and invalidation queues");
@@ -965,6 +977,8 @@ public class FSNamesystem implements Nam
//ResourceMonitor required only at ActiveNN. See HDFS-2914
this.nnrmthread = new Daemon(new NameNodeResourceMonitor());
nnrmthread.start();
+ cacheManager.activate();
+ blockManager.getDatanodeManager().setSendCachingCommands(true);
} finally {
writeUnlock();
startingActiveService = false;
@@ -1010,6 +1024,8 @@ public class FSNamesystem implements Nam
// so that the tailer starts from the right spot.
dir.fsImage.updateLastAppliedTxIdFromWritten();
}
+ cacheManager.deactivate();
+ blockManager.getDatanodeManager().setSendCachingCommands(false);
} finally {
writeUnlock();
}
@@ -1583,8 +1599,14 @@ public class FSNamesystem implements Nam
length = Math.min(length, fileSize - offset);
isUc = false;
}
- return blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
+ LocatedBlocks blocks =
+ blockManager.createLocatedBlocks(inode.getBlocks(), fileSize,
isUc, offset, length, needBlockToken, iip.isSnapshot());
+ // Set caching information for the located blocks.
+ for (LocatedBlock lb: blocks.getLocatedBlocks()) {
+ cacheManager.setCachedLocations(lb);
+ }
+ return blocks;
} finally {
if (isReadOp) {
readUnlock();
@@ -4057,15 +4079,16 @@ public class FSNamesystem implements Nam
* @throws IOException
*/
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
- StorageReport[] reports, int xceiverCount, int xmitsInProgress,
- int failedVolumes)
+ StorageReport[] reports, long cacheCapacity, long cacheUsed,
+ int xceiverCount, int xmitsInProgress, int failedVolumes)
throws IOException {
readLock();
try {
final int maxTransfer = blockManager.getMaxReplicationStreams()
- xmitsInProgress;
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
- nodeReg, reports, blockPoolId, xceiverCount, maxTransfer, failedVolumes);
+ nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
+ xceiverCount, maxTransfer, failedVolumes);
return new HeartbeatResponse(cmds, createHaStatusHeartbeat());
} finally {
readUnlock();
@@ -6517,6 +6540,10 @@ public class FSNamesystem implements Nam
public FSDirectory getFSDirectory() {
return dir;
}
+ /** @return the cache manager. */
+ public CacheManager getCacheManager() {
+ return cacheManager;
+ }
@Override // NameNodeMXBean
public String getCorruptFiles() {
@@ -6893,6 +6920,215 @@ public class FSNamesystem implements Nam
}
}
+ PathBasedCacheDescriptor addPathBasedCacheDirective(
+ PathBasedCacheDirective directive) throws IOException {
+ checkOperation(OperationCategory.WRITE);
+ final FSPermissionChecker pc = isPermissionEnabled ?
+ getPermissionChecker() : null;
+ CacheEntryWithPayload cacheEntry =
+ RetryCache.waitForCompletion(retryCache, null);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return (PathBasedCacheDescriptor) cacheEntry.getPayload();
+ }
+ boolean success = false;
+ PathBasedCacheDescriptor result = null;
+ writeLock();
+ try {
+ checkOperation(OperationCategory.WRITE);
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot add PathBasedCache directive", safeMode);
+ }
+ result = cacheManager.addDirective(directive, pc);
+ getEditLog().logAddPathBasedCacheDirective(directive,
+ cacheEntry != null);
+ success = true;
+ } finally {
+ writeUnlock();
+ if (success) {
+ getEditLog().logSync();
+ }
+ if (isAuditEnabled() && isExternalInvocation()) {
+ logAuditEvent(success, "addPathBasedCacheDirective", null, null, null);
+ }
+ RetryCache.setState(cacheEntry, success, result);
+ }
+ return result;
+ }
+
+ void removePathBasedCacheDescriptor(Long id) throws IOException {
+ checkOperation(OperationCategory.WRITE);
+ final FSPermissionChecker pc = isPermissionEnabled ?
+ getPermissionChecker() : null;
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return;
+ }
+ boolean success = false;
+ writeLock();
+ try {
+ checkOperation(OperationCategory.WRITE);
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot remove PathBasedCache directives", safeMode);
+ }
+ cacheManager.removeDescriptor(id, pc);
+ getEditLog().logRemovePathBasedCacheDescriptor(id, cacheEntry != null);
+ success = true;
+ } finally {
+ writeUnlock();
+ if (isAuditEnabled() && isExternalInvocation()) {
+ logAuditEvent(success, "removePathBasedCacheDescriptor", null, null,
+ null);
+ }
+ RetryCache.setState(cacheEntry, success);
+ }
+ getEditLog().logSync();
+ }
+
+ BatchedListEntries<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(
+ long startId, String pool, String path) throws IOException {
+ checkOperation(OperationCategory.READ);
+ final FSPermissionChecker pc = isPermissionEnabled ?
+ getPermissionChecker() : null;
+ BatchedListEntries<PathBasedCacheDescriptor> results;
+ readLock();
+ boolean success = false;
+ try {
+ checkOperation(OperationCategory.READ);
+ results =
+ cacheManager.listPathBasedCacheDescriptors(startId, pool, path, pc);
+ success = true;
+ } finally {
+ readUnlock();
+ if (isAuditEnabled() && isExternalInvocation()) {
+ logAuditEvent(success, "listPathBasedCacheDescriptors", null, null,
+ null);
+ }
+ }
+ return results;
+ }
+
+ public void addCachePool(CachePoolInfo req) throws IOException {
+ checkOperation(OperationCategory.WRITE);
+ final FSPermissionChecker pc = isPermissionEnabled ?
+ getPermissionChecker() : null;
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ writeLock();
+ boolean success = false;
+ try {
+ checkOperation(OperationCategory.WRITE);
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot add cache pool " + req.getPoolName(), safeMode);
+ }
+ if (pc != null) {
+ pc.checkSuperuserPrivilege();
+ }
+ CachePoolInfo info = cacheManager.addCachePool(req);
+ getEditLog().logAddCachePool(info, cacheEntry != null);
+ success = true;
+ } finally {
+ writeUnlock();
+ if (isAuditEnabled() && isExternalInvocation()) {
+ logAuditEvent(success, "addCachePool", req.getPoolName(), null, null);
+ }
+ RetryCache.setState(cacheEntry, success);
+ }
+
+ getEditLog().logSync();
+ }
+
+ public void modifyCachePool(CachePoolInfo req) throws IOException {
+ checkOperation(OperationCategory.WRITE);
+ final FSPermissionChecker pc =
+ isPermissionEnabled ? getPermissionChecker() : null;
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ writeLock();
+ boolean success = false;
+ try {
+ checkOperation(OperationCategory.WRITE);
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot modify cache pool " + req.getPoolName(), safeMode);
+ }
+ if (pc != null) {
+ pc.checkSuperuserPrivilege();
+ }
+ cacheManager.modifyCachePool(req);
+ getEditLog().logModifyCachePool(req, cacheEntry != null);
+ success = true;
+ } finally {
+ writeUnlock();
+ if (isAuditEnabled() && isExternalInvocation()) {
+ logAuditEvent(success, "modifyCachePool", req.getPoolName(), null, null);
+ }
+ RetryCache.setState(cacheEntry, success);
+ }
+
+ getEditLog().logSync();
+ }
+
+ public void removeCachePool(String cachePoolName) throws IOException {
+ checkOperation(OperationCategory.WRITE);
+ final FSPermissionChecker pc =
+ isPermissionEnabled ? getPermissionChecker() : null;
+ CacheEntry cacheEntry = RetryCache.waitForCompletion(retryCache);
+ if (cacheEntry != null && cacheEntry.isSuccess()) {
+ return; // Return previous response
+ }
+ writeLock();
+ boolean success = false;
+ try {
+ checkOperation(OperationCategory.WRITE);
+ if (isInSafeMode()) {
+ throw new SafeModeException(
+ "Cannot remove cache pool " + cachePoolName, safeMode);
+ }
+ if (pc != null) {
+ pc.checkSuperuserPrivilege();
+ }
+ cacheManager.removeCachePool(cachePoolName);
+ getEditLog().logRemoveCachePool(cachePoolName, cacheEntry != null);
+ success = true;
+ } finally {
+ writeUnlock();
+ if (isAuditEnabled() && isExternalInvocation()) {
+ logAuditEvent(success, "removeCachePool", cachePoolName, null, null);
+ }
+ RetryCache.setState(cacheEntry, success);
+ }
+
+ getEditLog().logSync();
+ }
+
+ public BatchedListEntries<CachePoolInfo> listCachePools(String prevKey)
+ throws IOException {
+ final FSPermissionChecker pc =
+ isPermissionEnabled ? getPermissionChecker() : null;
+ BatchedListEntries<CachePoolInfo> results;
+ checkOperation(OperationCategory.READ);
+ boolean success = false;
+ readLock();
+ try {
+ checkOperation(OperationCategory.READ);
+ results = cacheManager.listCachePools(pc, prevKey);
+ success = true;
+ } finally {
+ readUnlock();
+ if (isAuditEnabled() && isExternalInvocation()) {
+ logAuditEvent(success, "listCachePools", null, null, null);
+ }
+ }
+ return results;
+ }
+
/**
* Default AuditLogger implementation; used when no access logger is
* defined in the config file. It can also be explicitly listed in the
@@ -6950,10 +7186,9 @@ public class FSNamesystem implements Nam
logAuditMessage(sb.toString());
}
}
-
public void logAuditMessage(String message) {
auditLog.info(message);
}
}
-
}
+
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSPermissionChecker.java Tue Oct 29 21:04:31 2013
@@ -255,4 +255,30 @@ class FSPermissionChecker {
throw new AccessControlException("Permission denied by sticky bit setting:" +
" user=" + user + ", inode=" + inode);
}
+
+ /**
+ * Whether a cache pool can be accessed by the current context
+ *
+ * @param pool CachePool being accessed
+ * @param access type of action being performed on the cache pool
+ * @return if the pool can be accessed
+ */
+ public boolean checkPermission(CachePool pool, FsAction access) {
+ FsPermission mode = pool.getMode();
+ if (isSuperUser()) {
+ return true;
+ }
+ if (user.equals(pool.getOwnerName())
+ && mode.getUserAction().implies(access)) {
+ return true;
+ }
+ if (groups.contains(pool.getGroupName())
+ && mode.getGroupAction().implies(access)) {
+ return true;
+ }
+ if (mode.getOtherAction().implies(access)) {
+ return true;
+ }
+ return false;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Oct 29 21:04:31 2013
@@ -686,8 +686,13 @@ public class NameNode implements NameNod
try {
initializeGenericKeys(conf, nsId, namenodeId);
initialize(conf);
- state.prepareToEnterState(haContext);
- state.enterState(haContext);
+ try {
+ haContext.writeLock();
+ state.prepareToEnterState(haContext);
+ state.enterState(haContext);
+ } finally {
+ haContext.writeUnlock();
+ }
} catch (IOException e) {
this.stop();
throw e;
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Tue Oct 29 21:04:31 2013
@@ -36,6 +36,7 @@ import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
@@ -45,6 +46,7 @@ import org.apache.hadoop.fs.InvalidPathE
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.ParentNotDirectoryException;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.UnresolvedLinkException;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus;
@@ -59,6 +61,9 @@ import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
+import org.apache.hadoop.hdfs.protocol.PathBasedCacheDescriptor;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@@ -193,9 +198,9 @@ class NameNodeRpcServer implements Namen
NamenodeProtocolServerSideTranslatorPB namenodeProtocolXlator =
new NamenodeProtocolServerSideTranslatorPB(this);
- BlockingService NNPbService = NamenodeProtocolService
+ BlockingService NNPbService = NamenodeProtocolService
.newReflectiveBlockingService(namenodeProtocolXlator);
-
+
RefreshAuthorizationPolicyProtocolServerSideTranslatorPB refreshAuthPolicyXlator =
new RefreshAuthorizationPolicyProtocolServerSideTranslatorPB(this);
BlockingService refreshAuthService = RefreshAuthorizationPolicyProtocolService
@@ -215,7 +220,7 @@ class NameNodeRpcServer implements Namen
new HAServiceProtocolServerSideTranslatorPB(this);
BlockingService haPbService = HAServiceProtocolService
.newReflectiveBlockingService(haServiceProtocolXlator);
-
+
WritableRpcEngine.ensureInitialized();
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
@@ -957,11 +962,13 @@ class NameNodeRpcServer implements Namen
@Override // DatanodeProtocol
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
- StorageReport[] report, int xmitsInProgress, int xceiverCount,
+ StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
+ int xmitsInProgress, int xceiverCount,
int failedVolumes) throws IOException {
verifyRequest(nodeReg);
- return namesystem.handleHeartbeat(nodeReg, report, xceiverCount,
- xmitsInProgress, failedVolumes);
+ return namesystem.handleHeartbeat(nodeReg, report,
+ dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
+ failedVolumes);
}
@Override // DatanodeProtocol
@@ -983,6 +990,18 @@ class NameNodeRpcServer implements Namen
return null;
}
+ @Override
+ public DatanodeCommand cacheReport(DatanodeRegistration nodeReg,
+ String poolId, List<Long> blockIds) throws IOException {
+ verifyRequest(nodeReg);
+ if (blockStateChangeLog.isDebugEnabled()) {
+ blockStateChangeLog.debug("*BLOCK* NameNode.cacheReport: "
+ + "from " + nodeReg + " " + blockIds.size() + " blocks");
+ }
+ namesystem.getCacheManager().processCacheReport(nodeReg, blockIds);
+ return null;
+ }
+
@Override // DatanodeProtocol
public void blockReceivedAndDeleted(DatanodeRegistration nodeReg, String poolId,
StorageReceivedDeletedBlocks[] receivedAndDeletedBlocks) throws IOException {
@@ -1216,4 +1235,88 @@ class NameNodeRpcServer implements Namen
metrics.incrSnapshotDiffReportOps();
return report;
}
+
+ @Override
+ public PathBasedCacheDescriptor addPathBasedCacheDirective(
+ PathBasedCacheDirective path) throws IOException {
+ return namesystem.addPathBasedCacheDirective(path);
+ }
+
+ @Override
+ public void removePathBasedCacheDescriptor(Long id) throws IOException {
+ namesystem.removePathBasedCacheDescriptor(id);
+ }
+
+ private class ServerSidePathBasedCacheEntriesIterator
+ extends BatchedRemoteIterator<Long, PathBasedCacheDescriptor> {
+
+ private final String pool;
+
+ private final String path;
+
+ public ServerSidePathBasedCacheEntriesIterator(Long firstKey, String pool,
+ String path) {
+ super(firstKey);
+ this.pool = pool;
+ this.path = path;
+ }
+
+ @Override
+ public BatchedEntries<PathBasedCacheDescriptor> makeRequest(
+ Long nextKey) throws IOException {
+ return namesystem.listPathBasedCacheDescriptors(nextKey, pool, path);
+ }
+
+ @Override
+ public Long elementToPrevKey(PathBasedCacheDescriptor entry) {
+ return entry.getEntryId();
+ }
+ }
+
+ @Override
+ public RemoteIterator<PathBasedCacheDescriptor> listPathBasedCacheDescriptors(long prevId,
+ String pool, String path) throws IOException {
+ return new ServerSidePathBasedCacheEntriesIterator(prevId, pool, path);
+ }
+
+ @Override
+ public void addCachePool(CachePoolInfo info) throws IOException {
+ namesystem.addCachePool(info);
+ }
+
+ @Override
+ public void modifyCachePool(CachePoolInfo info) throws IOException {
+ namesystem.modifyCachePool(info);
+ }
+
+ @Override
+ public void removeCachePool(String cachePoolName) throws IOException {
+ namesystem.removeCachePool(cachePoolName);
+ }
+
+ private class ServerSideCachePoolIterator
+ extends BatchedRemoteIterator<String, CachePoolInfo> {
+
+ public ServerSideCachePoolIterator(String prevKey) {
+ super(prevKey);
+ }
+
+ @Override
+ public BatchedEntries<CachePoolInfo> makeRequest(String prevKey)
+ throws IOException {
+ return namesystem.listCachePools(prevKey);
+ }
+
+ @Override
+ public String elementToPrevKey(CachePoolInfo element) {
+ return element.getPoolName();
+ }
+ }
+
+ @Override
+ public RemoteIterator<CachePoolInfo> listCachePools(String prevKey)
+ throws IOException {
+ return new ServerSideCachePoolIterator(prevKey);
+ }
}
+
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/NameNodeMetrics.java Tue Oct 29 21:04:31 2013
@@ -79,6 +79,8 @@ public class NameNodeMetrics {
MutableCounterLong transactionsBatchedInSync;
@Metric("Block report") MutableRate blockReport;
MutableQuantiles[] blockReportQuantiles;
+ @Metric("Cache report") MutableRate cacheReport;
+ MutableQuantiles[] cacheReportQuantiles;
@Metric("Duration in SafeMode at startup in msec")
MutableGaugeInt safeModeTime;
@@ -91,6 +93,7 @@ public class NameNodeMetrics {
final int len = intervals.length;
syncsQuantiles = new MutableQuantiles[len];
blockReportQuantiles = new MutableQuantiles[len];
+ cacheReportQuantiles = new MutableQuantiles[len];
for (int i = 0; i < len; i++) {
int interval = intervals[i];
@@ -100,6 +103,9 @@ public class NameNodeMetrics {
blockReportQuantiles[i] = registry.newQuantiles(
"blockReport" + interval + "s",
"Block report", "ops", "latency", interval);
+ cacheReportQuantiles[i] = registry.newQuantiles(
+ "cacheReport" + interval + "s",
+ "Cache report", "ops", "latency", interval);
}
}
@@ -229,6 +235,13 @@ public class NameNodeMetrics {
}
}
+ public void addCacheBlockReport(long latency) {
+ cacheReport.add(latency);
+ for (MutableQuantiles q : cacheReportQuantiles) {
+ q.add(latency);
+ }
+ }
+
public void setSafeModeTime(long elapsed) {
safeModeTime.set((int) elapsed);
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/startupprogress/StepType.java Tue Oct 29 21:04:31 2013
@@ -42,7 +42,17 @@ public enum StepType {
/**
* The namenode is performing an operation related to inodes.
*/
- INODES("Inodes", "inodes");
+ INODES("Inodes", "inodes"),
+
+ /**
+ * The namenode is performing an operation related to cache pools.
+ */
+ CACHE_POOLS("CachePools", "cache pools"),
+
+ /**
+ * The namenode is performing an operation related to cache entries.
+ */
+ CACHE_ENTRIES("CacheEntries", "cache entries");
private final String name, description;
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Tue Oct 29 21:04:31 2013
@@ -19,13 +19,14 @@
package org.apache.hadoop.hdfs.server.protocol;
import java.io.*;
+import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.io.retry.AtMostOnce;
import org.apache.hadoop.io.retry.Idempotent;
import org.apache.hadoop.security.KerberosInfo;
@@ -74,6 +75,8 @@ public interface DatanodeProtocol {
final static int DNA_RECOVERBLOCK = 6; // request a block recovery
final static int DNA_ACCESSKEYUPDATE = 7; // update access key
final static int DNA_BALANCERBANDWIDTHUPDATE = 8; // update balancer bandwidth
+ final static int DNA_CACHE = 9; // cache blocks
+ final static int DNA_UNCACHE = 10; // uncache blocks
/**
* Register Datanode.
@@ -104,6 +107,8 @@ public interface DatanodeProtocol {
@Idempotent
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports,
+ long dnCacheCapacity,
+ long dnCacheUsed,
int xmitsInProgress,
int xceiverCount,
int failedVolumes) throws IOException;
@@ -128,6 +133,24 @@ public interface DatanodeProtocol {
public DatanodeCommand blockReport(DatanodeRegistration registration,
String poolId, StorageBlockReport[] reports) throws IOException;
+
+ /**
+ * Communicates the complete list of locally cached blocks to the NameNode.
+ *
+ * This method is similar to
+ * {@link #blockReport(DatanodeRegistration, String, StorageBlockReport[])},
+ * which is used to communicated blocks stored on disk.
+ *
+ * @param The datanode registration.
+ * @param poolId The block pool ID for the blocks.
+ * @param blockIds A list of block IDs.
+ * @return The DatanodeCommand.
+ * @throws IOException
+ */
+ @Idempotent
+ public DatanodeCommand cacheReport(DatanodeRegistration registration,
+ String poolId, List<Long> blockIds) throws IOException;
+
/**
* blockReceivedAndDeleted() allows the DataNode to tell the NameNode about
* recently-received and -deleted block data.
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageLoaderCurrent.java Tue Oct 29 21:04:31 2013
@@ -126,7 +126,7 @@ class ImageLoaderCurrent implements Imag
new SimpleDateFormat("yyyy-MM-dd HH:mm");
private static int[] versions = { -16, -17, -18, -19, -20, -21, -22, -23,
-24, -25, -26, -27, -28, -30, -31, -32, -33, -34, -35, -36, -37, -38, -39,
- -40, -41, -42, -43, -44, -45, -46, -47 };
+ -40, -41, -42, -43, -44, -45, -46, -47, -48 };
private int imageVersion = 0;
private final Map<Long, String> subtreeMap = new HashMap<Long, String>();
@@ -216,6 +216,9 @@ class ImageLoaderCurrent implements Imag
processDelegationTokens(in, v);
}
+ if (LayoutVersion.supports(Feature.CACHING, imageVersion)) {
+ processCacheManagerState(in, v);
+ }
v.leaveEnclosingElement(); // FSImage
done = true;
} finally {
@@ -228,6 +231,25 @@ class ImageLoaderCurrent implements Imag
}
/**
+ * Process CacheManager state from the fsimage.
+ */
+ private void processCacheManagerState(DataInputStream in, ImageVisitor v)
+ throws IOException {
+ v.visit(ImageElement.CACHE_NEXT_ENTRY_ID, in.readLong());
+ final int numPools = in.readInt();
+ for (int i=0; i<numPools; i++) {
+ v.visit(ImageElement.CACHE_POOL_NAME, Text.readString(in));
+ processCachePoolPermission(in, v);
+ v.visit(ImageElement.CACHE_POOL_WEIGHT, in.readInt());
+ }
+ final int numEntries = in.readInt();
+ for (int i=0; i<numEntries; i++) {
+ v.visit(ImageElement.CACHE_ENTRY_PATH, Text.readString(in));
+ v.visit(ImageElement.CACHE_ENTRY_REPLICATION, in.readShort());
+ v.visit(ImageElement.CACHE_ENTRY_POOL_NAME, Text.readString(in));
+ }
+ }
+ /**
* Process the Delegation Token related section in fsimage.
*
* @param in DataInputStream to process
@@ -385,6 +407,22 @@ class ImageLoaderCurrent implements Imag
}
/**
+ * Extract CachePool permissions stored in the fsimage file.
+ *
+ * @param in Datastream to process
+ * @param v Visitor to walk over inodes
+ */
+ private void processCachePoolPermission(DataInputStream in, ImageVisitor v)
+ throws IOException {
+ v.visitEnclosingElement(ImageElement.PERMISSIONS);
+ v.visit(ImageElement.CACHE_POOL_OWNER_NAME, Text.readString(in));
+ v.visit(ImageElement.CACHE_POOL_GROUP_NAME, Text.readString(in));
+ FsPermission fsp = new FsPermission(in.readShort());
+ v.visit(ImageElement.CACHE_POOL_PERMISSION_STRING, fsp.toString());
+ v.leaveEnclosingElement(); // Permissions
+ }
+
+ /**
* Process the INode records stored in the fsimage.
*
* @param in Datastream to process
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/ImageVisitor.java Tue Oct 29 21:04:31 2013
@@ -117,7 +117,19 @@ abstract class ImageVisitor {
SNAPSHOT_DST_SNAPSHOT_ID,
SNAPSHOT_LAST_SNAPSHOT_ID,
SNAPSHOT_REF_INODE_ID,
- SNAPSHOT_REF_INODE
+ SNAPSHOT_REF_INODE,
+
+ CACHE_NEXT_ENTRY_ID,
+ CACHE_NUM_POOLS,
+ CACHE_POOL_NAME,
+ CACHE_POOL_OWNER_NAME,
+ CACHE_POOL_GROUP_NAME,
+ CACHE_POOL_PERMISSION_STRING,
+ CACHE_POOL_WEIGHT,
+ CACHE_NUM_ENTRIES,
+ CACHE_ENTRY_PATH,
+ CACHE_ENTRY_REPLICATION,
+ CACHE_ENTRY_POOL_NAME
}
/**
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java Tue Oct 29 21:04:31 2013
@@ -290,6 +290,8 @@ public class JsonUtil {
m.put("dfsUsed", datanodeinfo.getDfsUsed());
m.put("remaining", datanodeinfo.getRemaining());
m.put("blockPoolUsed", datanodeinfo.getBlockPoolUsed());
+ m.put("cacheCapacity", datanodeinfo.getCacheCapacity());
+ m.put("cacheUsed", datanodeinfo.getCacheUsed());
m.put("lastUpdate", datanodeinfo.getLastUpdate());
m.put("xceiverCount", datanodeinfo.getXceiverCount());
m.put("networkLocation", datanodeinfo.getNetworkLocation());
@@ -297,16 +299,36 @@ public class JsonUtil {
return m;
}
+ private static int getInt(Map<?, ?> m, String key, final int defaultValue) {
+ Object value = m.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return (int) (long) (Long) value;
+ }
+
+ private static long getLong(Map<?, ?> m, String key, final long defaultValue) {
+ Object value = m.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return (long) (Long) value;
+ }
+
+ private static String getString(Map<?, ?> m, String key,
+ final String defaultValue) {
+ Object value = m.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return (String) value;
+ }
+
/** Convert a Json map to an DatanodeInfo object. */
static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
if (m == null) {
return null;
}
-
- Object infoSecurePort = m.get("infoSecurePort");
- if (infoSecurePort == null) {
- infoSecurePort = 0l; // same as the default value in hdfs.proto
- }
// TODO: Fix storageID
return new DatanodeInfo(
@@ -315,17 +337,19 @@ public class JsonUtil {
(String)m.get("storageID"),
(int)(long)(Long)m.get("xferPort"),
(int)(long)(Long)m.get("infoPort"),
- (int)(long)(Long)infoSecurePort,
+ getInt(m, "infoSecurePort", 0),
(int)(long)(Long)m.get("ipcPort"),
- (Long)m.get("capacity"),
- (Long)m.get("dfsUsed"),
- (Long)m.get("remaining"),
- (Long)m.get("blockPoolUsed"),
- (Long)m.get("lastUpdate"),
- (int)(long)(Long)m.get("xceiverCount"),
- (String)m.get("networkLocation"),
- AdminStates.valueOf((String)m.get("adminState")));
+ getLong(m, "capacity", 0l),
+ getLong(m, "dfsUsed", 0l),
+ getLong(m, "remaining", 0l),
+ getLong(m, "blockPoolUsed", 0l),
+ getLong(m, "cacheCapacity", 0l),
+ getLong(m, "cacheUsed", 0l),
+ getLong(m, "lastUpdate", 0l),
+ getInt(m, "xceiverCount", 0),
+ getString(m, "networkLocation", ""),
+ AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
}
/** Convert a DatanodeInfo[] to a Json array. */
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1509426-1536569
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1536572
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/ClientNamenodeProtocol.proto Tue Oct 29 21:04:31 2013
@@ -364,6 +364,91 @@ message IsFileClosedResponseProto {
required bool result = 1;
}
+message PathBasedCacheDirectiveProto {
+ required string path = 1;
+ required uint32 replication = 2;
+ required string pool = 3;
+}
+
+message AddPathBasedCacheDirectiveRequestProto {
+ required PathBasedCacheDirectiveProto directive = 1;
+}
+
+message AddPathBasedCacheDirectiveResponseProto {
+ required int64 descriptorId = 1;
+}
+
+message RemovePathBasedCacheDescriptorRequestProto {
+ required int64 descriptorId = 1;
+}
+
+message RemovePathBasedCacheDescriptorResponseProto {
+}
+
+message ListPathBasedCacheDescriptorsRequestProto {
+ required int64 prevId = 1;
+ optional string pool = 2;
+ optional string path = 3;
+}
+
+message ListPathBasedCacheDescriptorsElementProto {
+ required int64 id = 1;
+ required string pool = 2;
+ required uint32 replication = 3;
+ required string path = 4;
+}
+
+message ListPathBasedCacheDescriptorsResponseProto {
+ repeated ListPathBasedCacheDescriptorsElementProto elements = 1;
+ required bool hasMore = 2;
+}
+
+message AddCachePoolRequestProto {
+ required string poolName = 1;
+ optional string ownerName = 2;
+ optional string groupName = 3;
+ optional int32 mode = 4;
+ optional int32 weight = 5;
+}
+
+message AddCachePoolResponseProto { // void response
+}
+
+message ModifyCachePoolRequestProto {
+ required string poolName = 1;
+ optional string ownerName = 2;
+ optional string groupName = 3;
+ optional int32 mode = 4;
+ optional int32 weight = 5;
+}
+
+message ModifyCachePoolResponseProto { // void response
+}
+
+message RemoveCachePoolRequestProto {
+ required string poolName = 1;
+}
+
+message RemoveCachePoolResponseProto { // void response
+}
+
+message ListCachePoolsRequestProto {
+ required string prevPoolName = 1;
+}
+
+message ListCachePoolsResponseProto {
+ repeated ListCachePoolsResponseElementProto elements = 1;
+ required bool hasMore = 2;
+}
+
+message ListCachePoolsResponseElementProto {
+ required string poolName = 1;
+ required string ownerName = 2;
+ required string groupName = 3;
+ required int32 mode = 4;
+ required int32 weight = 5;
+}
+
message GetFileLinkInfoRequestProto {
required string src = 1;
}
@@ -546,6 +631,20 @@ service ClientNamenodeProtocol {
returns(ListCorruptFileBlocksResponseProto);
rpc metaSave(MetaSaveRequestProto) returns(MetaSaveResponseProto);
rpc getFileInfo(GetFileInfoRequestProto) returns(GetFileInfoResponseProto);
+ rpc addPathBasedCacheDirective(AddPathBasedCacheDirectiveRequestProto)
+ returns (AddPathBasedCacheDirectiveResponseProto);
+ rpc removePathBasedCacheDescriptor(RemovePathBasedCacheDescriptorRequestProto)
+ returns (RemovePathBasedCacheDescriptorResponseProto);
+ rpc listPathBasedCacheDescriptors(ListPathBasedCacheDescriptorsRequestProto)
+ returns (ListPathBasedCacheDescriptorsResponseProto);
+ rpc addCachePool(AddCachePoolRequestProto)
+ returns(AddCachePoolResponseProto);
+ rpc modifyCachePool(ModifyCachePoolRequestProto)
+ returns(ModifyCachePoolResponseProto);
+ rpc removeCachePool(RemoveCachePoolRequestProto)
+ returns(RemoveCachePoolResponseProto);
+ rpc listCachePools(ListCachePoolsRequestProto)
+ returns(ListCachePoolsResponseProto);
rpc getFileLinkInfo(GetFileLinkInfoRequestProto)
returns(GetFileLinkInfoResponseProto);
rpc getContentSummary(GetContentSummaryRequestProto)
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto Tue Oct 29 21:04:31 2013
@@ -71,6 +71,7 @@ message DatanodeCommandProto {
RegisterCommand = 5;
UnusedUpgradeCommand = 6;
NullDatanodeCommand = 7;
+ BlockIdCommand = 8;
}
required Type cmdType = 1; // Type of the command
@@ -83,6 +84,7 @@ message DatanodeCommandProto {
optional FinalizeCommandProto finalizeCmd = 5;
optional KeyUpdateCommandProto keyUpdateCmd = 6;
optional RegisterCommandProto registerCmd = 7;
+ optional BlockIdCommandProto blkIdCmd = 8;
}
/**
@@ -103,7 +105,7 @@ message BlockCommandProto {
enum Action {
TRANSFER = 1; // Transfer blocks to another datanode
INVALIDATE = 2; // Invalidate blocks
- SHUTDOWN = 3; // Shutdown the datanode
+ SHUTDOWN = 3; // Shutdown the datanode
}
required Action action = 1;
@@ -114,6 +116,20 @@ message BlockCommandProto {
}
/**
+ * Command to instruct datanodes to perform certain action
+ * on the given set of block IDs.
+ */
+message BlockIdCommandProto {
+ enum Action {
+ CACHE = 1;
+ UNCACHE = 2;
+ }
+ required Action action = 1;
+ required string blockPoolId = 2;
+ repeated uint64 blockIds = 3 [packed=true];
+}
+
+/**
* List of blocks to be recovered by the datanode
*/
message BlockRecoveryCommandProto {
@@ -166,6 +182,8 @@ message RegisterDatanodeResponseProto {
* xmitsInProgress - number of transfers from this datanode to others
* xceiverCount - number of active transceiver threads
* failedVolumes - number of failed volumes
+ * cacheCapacity - total cache capacity available at the datanode
+ * cacheUsed - amount of cache used
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@@ -173,6 +191,8 @@ message HeartbeatRequestProto {
optional uint32 xmitsInProgress = 3 [ default = 0 ];
optional uint32 xceiverCount = 4 [ default = 0 ];
optional uint32 failedVolumes = 5 [ default = 0 ];
+ optional uint64 dnCacheCapacity = 6 [ default = 0 ];
+ optional uint64 dnCacheUsed = 7 [default = 0 ];
}
message StorageReportProto {
@@ -205,9 +225,11 @@ message HeartbeatResponseProto {
/**
* registration - datanode registration information
* blockPoolID - block pool ID of the reported blocks
- * blocks - each block is represented as two longs in the array.
+ * blocks - each block is represented as multiple longs in the array.
* first long represents block ID
* second long represents length
+ * third long represents gen stamp
+ * fourth long (if under construction) represents replica state
*/
message BlockReportRequestProto {
required DatanodeRegistrationProto registration = 1;
@@ -231,6 +253,21 @@ message BlockReportResponseProto {
}
/**
+ * registration - datanode registration information
+ * blockPoolId - block pool ID of the reported blocks
+ * blocks - representation of blocks as longs for efficiency reasons
+ */
+message CacheReportRequestProto {
+ required DatanodeRegistrationProto registration = 1;
+ required string blockPoolId = 2;
+ repeated uint64 blocks = 3 [packed=true];
+}
+
+message CacheReportResponseProto {
+ optional DatanodeCommandProto cmd = 1;
+}
+
+/**
* Data structure to send received or deleted block information
* from datanode to namenode.
*/
@@ -348,6 +385,11 @@ service DatanodeProtocolService {
rpc blockReport(BlockReportRequestProto) returns(BlockReportResponseProto);
/**
+ * Report cached blocks at a datanode to the namenode
+ */
+ rpc cacheReport(CacheReportRequestProto) returns(CacheReportResponseProto);
+
+ /**
* Incremental block report from the DN. This contains info about recently
* received and deleted blocks, as well as when blocks start being
* received.
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/hdfs.proto Tue Oct 29 21:04:31 2013
@@ -86,6 +86,8 @@ message DatanodeInfoProto {
}
optional AdminState adminState = 10 [default = NORMAL];
+ optional uint64 cacheCapacity = 11 [default = 0];
+ optional uint64 cacheUsed = 12 [default = 0];
}
/**
@@ -144,8 +146,9 @@ message LocatedBlockProto {
// their locations are not part of this object
required hadoop.common.TokenProto blockToken = 5;
- repeated StorageTypeProto storageTypes = 6;
- repeated string storageIDs = 7;
+ repeated bool isCached = 6 [packed=true]; // if a location in locs is cached
+ repeated StorageTypeProto storageTypes = 7;
+ repeated string storageIDs = 8;
}
message DataEncryptionKeyProto {
@@ -455,3 +458,4 @@ message SnapshotInfoProto {
// TODO: do we need access time?
}
+
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml Tue Oct 29 21:04:31 2013
@@ -1459,4 +1459,70 @@
</description>
</property>
+<property>
+ <name>dfs.namenode.caching.enabled</name>
+ <value>false</value>
+ <description>
+ Set to true to enable block caching. This flag enables the NameNode to
+ maintain a mapping of cached blocks to DataNodes via processing DataNode
+ cache reports. Based on these reports and addition and removal of caching
+ directives, the NameNode will schedule caching and uncaching work.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.max.locked.memory</name>
+ <value>0</value>
+ <description>
+ The amount of memory in bytes to use for caching of block replicas in
+ memory on the datanode. The datanode's maximum locked memory soft ulimit
+ (RLIMIT_MEMLOCK) must be set to at least this value, else the datanode
+ will abort on startup.
+
+ By default, this parameter is set to 0, which disables in-memory caching.
+
+ If the native libraries are not available to the DataNode, this
+ configuration has no effect.
+ </description>
+</property>
+
+<property>
+ <name>dfs.namenode.path.based.cache.refresh.interval.ms</name>
+ <value>300000</value>
+ <description>
+ The amount of milliseconds between subsequent path cache rescans. Path
+ cache rescans are when we calculate which blocks should be cached, and on
+ what datanodes.
+
+ By default, this parameter is set to 300000, which is five minutes.
+ </description>
+</property>
+
+<property>
+ <name>dfs.datanode.fsdatasetcache.max.threads.per.volume</name>
+ <value>4</value>
+ <description>
+ The maximum number of threads per volume to use for caching new data
+ on the datanode. These threads consume both I/O and CPU. This can affect
+ normal datanode operations.
+ </description>
+</property>
+
+<property>
+ <name>dfs.cachereport.intervalMsec</name>
+ <value>10000</value>
+ <description>
+ Determines cache reporting interval in milliseconds. After this amount of
+ time, the DataNode sends a full report of its cache state to the NameNode.
+ The NameNode uses the cache report to update its map of cached blocks to
+ DataNode locations.
+
+ This configuration has no effect if in-memory caching has been disabled by
+ setting dfs.datanode.max.locked.memory to 0 (which is the default).
+
+ If the native libraries are not available to the DataNode, this
+ configuration has no effect.
+ </description>
+</property>
+
</configuration>
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1536572
Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/datanode:r1509426-1536569
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1536572
Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/hdfs:r1509426-1536569
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1536572
Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/main/webapps/secondary:r1509426-1536569
Propchange: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs/
------------------------------------------------------------------------------
Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1536572
Merged /hadoop/common/branches/HDFS-4949/hadoop-hdfs-project/hadoop-hdfs/src/test/hdfs:r1509426-1536569
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java Tue Oct 29 21:04:31 2013
@@ -823,7 +823,7 @@ public class DFSTestUtil {
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
- 1, 2, 3, 4, 5, 6, "local", adminState);
+ 1l, 2l, 3l, 4l, 0l, 0l, 5, 6, "local", adminState);
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
@@ -1033,6 +1033,20 @@ public class DFSTestUtil {
locatedBlocks = DFSClientAdapter.callGetBlockLocations(
cluster.getNameNodeRpc(nnIndex), filePath, 0L, bytes.length);
} while (locatedBlocks.isUnderConstruction());
+ // OP_ADD_CACHE_POOL 35
+ filesystem.addCachePool(new CachePoolInfo("pool1"));
+ // OP_MODIFY_CACHE_POOL 36
+ filesystem.modifyCachePool(new CachePoolInfo("pool1").setWeight(99));
+ // OP_ADD_PATH_BASED_CACHE_DIRECTIVE 33
+ PathBasedCacheDescriptor pbcd = filesystem.addPathBasedCacheDirective(
+ new PathBasedCacheDirective.Builder().
+ setPath(new Path("/path")).
+ setPool("pool1").
+ build());
+ // OP_REMOVE_PATH_BASED_CACHE_DESCRIPTOR 34
+ filesystem.removePathBasedCacheDescriptor(pbcd);
+ // OP_REMOVE_CACHE_POOL 37
+ filesystem.removeCachePool("pool1");
}
public static void abortStream(DFSOutputStream out) throws IOException {
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/LogVerificationAppender.java Tue Oct 29 21:04:31 2013
@@ -61,4 +61,15 @@ public class LogVerificationAppender ext
}
return count;
}
+
+ public int countLinesWithMessage(final String text) {
+ int count = 0;
+ for (LoggingEvent e: getLog()) {
+ String msg = e.getRenderedMessage();
+ if (msg != null && msg.contains(text)) {
+ count++;
+ }
+ }
+ return count;
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDatanodeConfig.java Tue Oct 29 21:04:31 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assume.assumeTrue;
import java.io.File;
import java.io.IOException;
@@ -30,6 +31,8 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
+import org.apache.hadoop.io.nativeio.NativeIO;
+import org.apache.hadoop.test.GenericTestUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -106,4 +109,38 @@ public class TestDatanodeConfig {
throw new IOException("Bad URI", e);
}
}
+
+ @Test(timeout=60000)
+ public void testMemlockLimit() throws Exception {
+ assumeTrue(NativeIO.isAvailable());
+ final long memlockLimit = NativeIO.getMemlockLimit();
+
+ // Can't increase the memlock limit past the maximum.
+ assumeTrue(memlockLimit != Long.MAX_VALUE);
+
+ Configuration conf = cluster.getConfiguration(0);
+ long prevLimit = conf.
+ getLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_DEFAULT);
+ try {
+ // Try starting the DN with limit configured to the ulimit
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ memlockLimit);
+ DataNode dn = null;
+ dn = DataNode.createDataNode(new String[]{}, conf);
+ dn.shutdown();
+ // Try starting the DN with a limit > ulimit
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ memlockLimit+1);
+ try {
+ dn = DataNode.createDataNode(new String[]{}, conf);
+ } catch (RuntimeException e) {
+ GenericTestUtils.assertExceptionContains(
+ "more than the datanode's available RLIMIT_MEMLOCK", e);
+ }
+ } finally {
+ conf.setLong(DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY,
+ prevLimit);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockManager.java Tue Oct 29 21:04:31 2013
@@ -107,7 +107,7 @@ public class TestBlockManager {
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
dn.updateHeartbeat(
- BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0, 0);
+ BlockManagerTestUtil.getStorageReportsForDatanode(dn), 0L, 0L, 0, 0);
bm.getDatanodeManager().checkIfClusterIsNowMultiRack(dn);
}
}
@@ -600,3 +600,4 @@ public class TestBlockManager {
assertEquals(1, ds.getBlockReportCount());
}
}
+
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestOverReplicatedBlocks.java Tue Oct 29 21:04:31 2013
@@ -106,7 +106,7 @@ public class TestOverReplicatedBlocks {
datanode.getStorageInfos()[0].setUtilization(100L, 100L, 0, 100L);
datanode.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
- 0, 0);
+ 0L, 0L, 0, 0);
}
}
@@ -223,3 +223,4 @@ public class TestOverReplicatedBlocks {
}
}
}
+
Modified: hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java?rev=1536889&r1=1536888&r2=1536889&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java (original)
+++ hadoop/common/branches/HDFS-2832/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java Tue Oct 29 21:04:31 2013
@@ -92,12 +92,12 @@ public class TestReplicationPolicy {
private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
- int xceiverCount, int volFailures) {
+ long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
dn.getStorageInfos()[0].setUtilization(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),
- xceiverCount, volFailures);
+ dnCacheCapacity, dnCacheUsed, xceiverCount, volFailures);
}
@BeforeClass
@@ -138,7 +138,7 @@ public class TestReplicationPolicy {
for (int i=0; i < NUM_OF_DATANODES; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+ 2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}
@@ -162,7 +162,8 @@ public class TestReplicationPolicy {
public void testChooseTarget1() throws Exception {
updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 4, 0); // overloaded
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
+ 0L, 0L, 4, 0); // overloaded
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
@@ -192,7 +193,7 @@ public class TestReplicationPolicy {
updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
private static DatanodeStorageInfo[] chooseTarget(int numOfReplicas) {
@@ -315,7 +316,8 @@ public class TestReplicationPolicy {
// make data node 0 to be not qualified to choose
updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0); // no space
+ (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L,
+ 0L, 0L, 0, 0); // no space
DatanodeStorageInfo[] targets;
targets = chooseTarget(0);
@@ -348,7 +350,7 @@ public class TestReplicationPolicy {
updateHeartbeatWithUsage(dataNodes[0],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
/**
@@ -365,7 +367,7 @@ public class TestReplicationPolicy {
for(int i=0; i<2; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+ (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
DatanodeStorageInfo[] targets;
@@ -393,7 +395,7 @@ public class TestReplicationPolicy {
for(int i=0; i<2; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}
@@ -455,7 +457,7 @@ public class TestReplicationPolicy {
for(int i=0; i<2; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0, 0);
+ (HdfsConstants.MIN_BLOCKS_FOR_WRITE-1)*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
final LogVerificationAppender appender = new LogVerificationAppender();
@@ -480,7 +482,7 @@ public class TestReplicationPolicy {
for(int i=0; i<2; i++) {
updateHeartbeatWithUsage(dataNodes[i],
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
- HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0, 0);
+ HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L, 0L, 0L, 0, 0);
}
}