You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by cm...@apache.org on 2014/08/20 01:50:25 UTC
svn commit: r1619012 [21/35] - in
/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project: hadoop-hdfs-httpfs/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/client/
hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/fs/http/server/ hadoop...
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Tue Aug 19 23:49:39 2014
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode.snapshot;
import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
@@ -29,9 +30,11 @@ import java.util.concurrent.atomic.Atomi
import javax.management.ObjectName;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotException;
import org.apache.hadoop.hdfs.protocol.SnapshotInfo;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.server.namenode.FSDirectory;
import org.apache.hadoop.hdfs.server.namenode.FSImageFormat;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@@ -39,9 +42,10 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.INode.BlocksMapUpdateInfo;
import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
import org.apache.hadoop.hdfs.server.namenode.INodesInPath;
-import org.apache.hadoop.hdfs.server.namenode.snapshot.INodeDirectorySnapshottable.SnapshotDiffInfo;
import org.apache.hadoop.metrics2.util.MBeans;
+import com.google.common.base.Preconditions;
+
/**
* Manage snapshottable directories and their snapshots.
*
@@ -64,8 +68,8 @@ public class SnapshotManager implements
private int snapshotCounter = 0;
/** All snapshottable directories in the namesystem. */
- private final Map<Long, INodeDirectorySnapshottable> snapshottables
- = new HashMap<Long, INodeDirectorySnapshottable>();
+ private final Map<Long, INodeDirectory> snapshottables =
+ new HashMap<Long, INodeDirectory>();
public SnapshotManager(final FSDirectory fsdir) {
this.fsdir = fsdir;
@@ -82,7 +86,7 @@ public class SnapshotManager implements
return;
}
- for(INodeDirectorySnapshottable s : snapshottables.values()) {
+ for(INodeDirectory s : snapshottables.values()) {
if (s.isAncestorDirectory(dir)) {
throw new SnapshotException(
"Nested snapshottable directories not allowed: path=" + path
@@ -110,33 +114,30 @@ public class SnapshotManager implements
checkNestedSnapshottable(d, path);
}
-
- final INodeDirectorySnapshottable s;
if (d.isSnapshottable()) {
//The directory is already a snapshottable directory.
- s = (INodeDirectorySnapshottable)d;
- s.setSnapshotQuota(INodeDirectorySnapshottable.SNAPSHOT_LIMIT);
+ d.setSnapshotQuota(DirectorySnapshottableFeature.SNAPSHOT_LIMIT);
} else {
- s = d.replaceSelf4INodeDirectorySnapshottable(iip.getLatestSnapshotId(),
- fsdir.getINodeMap());
+ d.addSnapshottableFeature();
}
- addSnapshottable(s);
+ addSnapshottable(d);
}
/** Add the given snapshottable directory to {@link #snapshottables}. */
- public void addSnapshottable(INodeDirectorySnapshottable dir) {
+ public void addSnapshottable(INodeDirectory dir) {
+ Preconditions.checkArgument(dir.isSnapshottable());
snapshottables.put(dir.getId(), dir);
}
/** Remove the given snapshottable directory from {@link #snapshottables}. */
- private void removeSnapshottable(INodeDirectorySnapshottable s) {
+ private void removeSnapshottable(INodeDirectory s) {
snapshottables.remove(s.getId());
}
/** Remove snapshottable directories from {@link #snapshottables} */
- public void removeSnapshottable(List<INodeDirectorySnapshottable> toRemove) {
+ public void removeSnapshottable(List<INodeDirectory> toRemove) {
if (toRemove != null) {
- for (INodeDirectorySnapshottable s : toRemove) {
+ for (INodeDirectory s : toRemove) {
removeSnapshottable(s);
}
}
@@ -150,22 +151,22 @@ public class SnapshotManager implements
public void resetSnapshottable(final String path) throws IOException {
final INodesInPath iip = fsdir.getINodesInPath4Write(path);
final INodeDirectory d = INodeDirectory.valueOf(iip.getLastINode(), path);
- if (!d.isSnapshottable()) {
+ DirectorySnapshottableFeature sf = d.getDirectorySnapshottableFeature();
+ if (sf == null) {
// the directory is already non-snapshottable
return;
}
- final INodeDirectorySnapshottable s = (INodeDirectorySnapshottable) d;
- if (s.getNumSnapshots() > 0) {
+ if (sf.getNumSnapshots() > 0) {
throw new SnapshotException("The directory " + path + " has snapshot(s). "
+ "Please redo the operation after removing all the snapshots.");
}
- if (s == fsdir.getRoot()) {
- s.setSnapshotQuota(0);
+ if (d == fsdir.getRoot()) {
+ d.setSnapshotQuota(0);
} else {
- s.replaceSelf(iip.getLatestSnapshotId(), fsdir.getINodeMap());
+ d.removeSnapshottableFeature();
}
- removeSnapshottable(s);
+ removeSnapshottable(d);
}
/**
@@ -178,10 +179,15 @@ public class SnapshotManager implements
* Throw IOException when the given path does not lead to an
* existing snapshottable directory.
*/
- public INodeDirectorySnapshottable getSnapshottableRoot(final String path
- ) throws IOException {
- final INodesInPath i = fsdir.getINodesInPath4Write(path);
- return INodeDirectorySnapshottable.valueOf(i.getLastINode(), path);
+ public INodeDirectory getSnapshottableRoot(final String path)
+ throws IOException {
+ final INodeDirectory dir = INodeDirectory.valueOf(fsdir
+ .getINodesInPath4Write(path).getLastINode(), path);
+ if (!dir.isSnapshottable()) {
+ throw new SnapshotException(
+ "Directory is not a snapshottable directory: " + path);
+ }
+ return dir;
}
/**
@@ -200,7 +206,7 @@ public class SnapshotManager implements
*/
public String createSnapshot(final String path, String snapshotName
) throws IOException {
- INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+ INodeDirectory srcRoot = getSnapshottableRoot(path);
if (snapshotCounter == getMaxSnapshotID()) {
// We have reached the maximum allowable snapshot ID and since we don't
@@ -233,7 +239,7 @@ public class SnapshotManager implements
// parse the path, and check if the path is a snapshot path
// the INodeDirectorySnapshottable#valueOf method will throw Exception
// if the path is not for a snapshottable directory
- INodeDirectorySnapshottable srcRoot = getSnapshottableRoot(path);
+ INodeDirectory srcRoot = getSnapshottableRoot(path);
srcRoot.removeSnapshot(snapshotName, collectedBlocks, removedINodes);
numSnapshots.getAndDecrement();
}
@@ -256,8 +262,7 @@ public class SnapshotManager implements
final String newSnapshotName) throws IOException {
// Find the source root directory path where the snapshot was taken.
// All the check for path has been included in the valueOf method.
- final INodeDirectorySnapshottable srcRoot
- = INodeDirectorySnapshottable.valueOf(fsdir.getINode(path), path);
+ final INodeDirectory srcRoot = getSnapshottableRoot(path);
// Note that renameSnapshot and createSnapshot are synchronized externally
// through FSNamesystem's write lock
srcRoot.renameSnapshot(path, oldSnapshotName, newSnapshotName);
@@ -283,9 +288,26 @@ public class SnapshotManager implements
snapshotCounter = counter;
}
- INodeDirectorySnapshottable[] getSnapshottableDirs() {
+ INodeDirectory[] getSnapshottableDirs() {
return snapshottables.values().toArray(
- new INodeDirectorySnapshottable[snapshottables.size()]);
+ new INodeDirectory[snapshottables.size()]);
+ }
+
+ /**
+ * Write {@link #snapshotCounter}, {@link #numSnapshots},
+ * and all snapshots to the DataOutput.
+ */
+ public void write(DataOutput out) throws IOException {
+ out.writeInt(snapshotCounter);
+ out.writeInt(numSnapshots.get());
+
+ // write all snapshots.
+ for(INodeDirectory snapshottableDir : snapshottables.values()) {
+ for (Snapshot s : snapshottableDir.getDirectorySnapshottableFeature()
+ .getSnapshotList()) {
+ s.write(out);
+ }
+ }
}
/**
@@ -321,16 +343,16 @@ public class SnapshotManager implements
List<SnapshottableDirectoryStatus> statusList =
new ArrayList<SnapshottableDirectoryStatus>();
- for (INodeDirectorySnapshottable dir : snapshottables.values()) {
+ for (INodeDirectory dir : snapshottables.values()) {
if (userName == null || userName.equals(dir.getUserName())) {
SnapshottableDirectoryStatus status = new SnapshottableDirectoryStatus(
dir.getModificationTime(), dir.getAccessTime(),
dir.getFsPermission(), dir.getUserName(), dir.getGroupName(),
dir.getLocalNameBytes(), dir.getId(),
dir.getChildrenNum(Snapshot.CURRENT_STATE_ID),
- dir.getNumSnapshots(),
- dir.getSnapshotQuota(), dir.getParent() == null ?
- DFSUtil.EMPTY_BYTES :
+ dir.getDirectorySnapshottableFeature().getNumSnapshots(),
+ dir.getDirectorySnapshottableFeature().getSnapshotQuota(),
+ dir.getParent() == null ? DFSUtil.EMPTY_BYTES :
DFSUtil.string2Bytes(dir.getParent().getFullPathName()));
statusList.add(status);
}
@@ -344,21 +366,22 @@ public class SnapshotManager implements
* Compute the difference between two snapshots of a directory, or between a
* snapshot of the directory and its current tree.
*/
- public SnapshotDiffInfo diff(final String path, final String from,
+ public SnapshotDiffReport diff(final String path, final String from,
final String to) throws IOException {
+ // Find the source root directory path where the snapshots were taken.
+ // All the check for path has been included in the valueOf method.
+ final INodeDirectory snapshotRoot = getSnapshottableRoot(path);
+
if ((from == null || from.isEmpty())
&& (to == null || to.isEmpty())) {
// both fromSnapshot and toSnapshot indicate the current tree
- return null;
+ return new SnapshotDiffReport(path, from, to,
+ Collections.<DiffReportEntry> emptyList());
}
-
- // Find the source root directory path where the snapshots were taken.
- // All the check for path has been included in the valueOf method.
- INodesInPath inodesInPath = fsdir.getINodesInPath4Write(path.toString());
- final INodeDirectorySnapshottable snapshotRoot = INodeDirectorySnapshottable
- .valueOf(inodesInPath.getLastINode(), path);
-
- return snapshotRoot.computeDiff(from, to);
+ final SnapshotDiffInfo diffs = snapshotRoot
+ .getDirectorySnapshottableFeature().computeDiff(snapshotRoot, from, to);
+ return diffs != null ? diffs.generateReport() : new SnapshotDiffReport(
+ path, from, to, Collections.<DiffReportEntry> emptyList());
}
public void clearSnapshottableDirs() {
@@ -391,7 +414,7 @@ public class SnapshotManager implements
getSnapshottableDirectories() {
List<SnapshottableDirectoryStatus.Bean> beans =
new ArrayList<SnapshottableDirectoryStatus.Bean>();
- for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
+ for (INodeDirectory d : getSnapshottableDirs()) {
beans.add(toBean(d));
}
return beans.toArray(new SnapshottableDirectoryStatus.Bean[beans.size()]);
@@ -400,20 +423,19 @@ public class SnapshotManager implements
@Override // SnapshotStatsMXBean
public SnapshotInfo.Bean[] getSnapshots() {
List<SnapshotInfo.Bean> beans = new ArrayList<SnapshotInfo.Bean>();
- for (INodeDirectorySnapshottable d : getSnapshottableDirs()) {
- for (Snapshot s : d.getSnapshotList()) {
+ for (INodeDirectory d : getSnapshottableDirs()) {
+ for (Snapshot s : d.getDirectorySnapshottableFeature().getSnapshotList()) {
beans.add(toBean(s));
}
}
return beans.toArray(new SnapshotInfo.Bean[beans.size()]);
}
- public static SnapshottableDirectoryStatus.Bean toBean(
- INodeDirectorySnapshottable d) {
+ public static SnapshottableDirectoryStatus.Bean toBean(INodeDirectory d) {
return new SnapshottableDirectoryStatus.Bean(
d.getFullPathName(),
- d.getNumSnapshots(),
- d.getSnapshotQuota(),
+ d.getDirectorySnapshottableFeature().getNumSnapshots(),
+ d.getDirectorySnapshottableFeature().getSnapshotQuota(),
d.getModificationTime(),
Short.valueOf(Integer.toOctalString(
d.getFsPermissionShort())),
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/web/resources/NamenodeWebHdfsMethods.java Tue Aug 19 23:49:39 2014
@@ -28,6 +28,8 @@ import java.net.URISyntaxException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.EnumSet;
+import java.util.HashSet;
+import java.util.List;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
@@ -53,8 +55,11 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.XAttr;
import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.hdfs.StorageType;
+import org.apache.hadoop.hdfs.XAttrHelper;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
@@ -81,6 +86,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.DeleteOpParam;
import org.apache.hadoop.hdfs.web.resources.DestinationParam;
import org.apache.hadoop.hdfs.web.resources.DoAsParam;
+import org.apache.hadoop.hdfs.web.resources.ExcludeDatanodesParam;
import org.apache.hadoop.hdfs.web.resources.GetOpParam;
import org.apache.hadoop.hdfs.web.resources.GroupParam;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam;
@@ -88,6 +94,7 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.ModificationTimeParam;
import org.apache.hadoop.hdfs.web.resources.NamenodeAddressParam;
import org.apache.hadoop.hdfs.web.resources.OffsetParam;
+import org.apache.hadoop.hdfs.web.resources.OldSnapshotNameParam;
import org.apache.hadoop.hdfs.web.resources.OverwriteParam;
import org.apache.hadoop.hdfs.web.resources.OwnerParam;
import org.apache.hadoop.hdfs.web.resources.Param;
@@ -98,20 +105,30 @@ import org.apache.hadoop.hdfs.web.resour
import org.apache.hadoop.hdfs.web.resources.RenameOptionSetParam;
import org.apache.hadoop.hdfs.web.resources.RenewerParam;
import org.apache.hadoop.hdfs.web.resources.ReplicationParam;
+import org.apache.hadoop.hdfs.web.resources.SnapshotNameParam;
import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
import org.apache.hadoop.hdfs.web.resources.UriFsPathParam;
import org.apache.hadoop.hdfs.web.resources.UserParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrEncodingParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrNameParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrSetFlagParam;
+import org.apache.hadoop.hdfs.web.resources.XAttrValueParam;
+import org.apache.hadoop.hdfs.web.resources.FsActionParam;
import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
+import com.google.common.collect.Lists;
import com.sun.jersey.spi.container.ResourceFilters;
/** Web-hdfs NameNode implementation. */
@@ -162,22 +179,44 @@ public class NamenodeWebHdfsMethods {
//clear content type
response.setContentType(null);
+
+ // set the remote address, if coming in via a trust proxy server then
+ // the address with be that of the proxied client
+ REMOTE_ADDRESS.set(JspHelper.getRemoteAddr(request));
}
+ private void reset() {
+ REMOTE_ADDRESS.set(null);
+ }
+
private static NamenodeProtocols getRPCServer(NameNode namenode)
throws IOException {
final NamenodeProtocols np = namenode.getRpcServer();
if (np == null) {
- throw new IOException("Namenode is in startup mode");
+ throw new RetriableException("Namenode is in startup mode");
}
return np;
}
-
+
@VisibleForTesting
static DatanodeInfo chooseDatanode(final NameNode namenode,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize) throws IOException {
+ final long blocksize, final String excludeDatanodes) throws IOException {
final BlockManager bm = namenode.getNamesystem().getBlockManager();
+
+ HashSet<Node> excludes = new HashSet<Node>();
+ if (excludeDatanodes != null) {
+ for (String host : StringUtils
+ .getTrimmedStringCollection(excludeDatanodes)) {
+ int idx = host.indexOf(":");
+ if (idx != -1) {
+ excludes.add(bm.getDatanodeManager().getDatanodeByXferAddr(
+ host.substring(0, idx), Integer.parseInt(host.substring(idx + 1))));
+ } else {
+ excludes.add(bm.getDatanodeManager().getDatanodeByHost(host));
+ }
+ }
+ }
if (op == PutOpParam.Op.CREATE) {
//choose a datanode near to client
@@ -186,7 +225,7 @@ public class NamenodeWebHdfsMethods {
if (clientNode != null) {
final DatanodeStorageInfo[] storages = bm.getBlockPlacementPolicy()
.chooseTarget(path, 1, clientNode,
- new ArrayList<DatanodeStorageInfo>(), false, null, blocksize,
+ new ArrayList<DatanodeStorageInfo>(), false, excludes, blocksize,
// TODO: get storage type from the file
StorageType.DEFAULT);
if (storages.length > 0) {
@@ -215,7 +254,7 @@ public class NamenodeWebHdfsMethods {
final LocatedBlocks locations = np.getBlockLocations(path, offset, 1);
final int count = locations.locatedBlockCount();
if (count > 0) {
- return bestNode(locations.get(0).getLocations());
+ return bestNode(locations.get(0).getLocations(), excludes);
}
}
}
@@ -229,11 +268,14 @@ public class NamenodeWebHdfsMethods {
* sorted based on availability and network distances, thus it is sufficient
* to return the first element of the node here.
*/
- private static DatanodeInfo bestNode(DatanodeInfo[] nodes) throws IOException {
- if (nodes.length == 0 || nodes[0].isDecommissioned()) {
- throw new IOException("No active nodes contain this block");
+ private static DatanodeInfo bestNode(DatanodeInfo[] nodes,
+ HashSet<Node> excludes) throws IOException {
+ for (DatanodeInfo dn: nodes) {
+ if (false == dn.isDecommissioned() && false == excludes.contains(dn)) {
+ return dn;
+ }
}
- return nodes[0];
+ throw new IOException("No active nodes contain this block");
}
private Token<? extends TokenIdentifier> generateDelegationToken(
@@ -252,11 +294,12 @@ public class NamenodeWebHdfsMethods {
final UserGroupInformation ugi, final DelegationParam delegation,
final UserParam username, final DoAsParam doAsUser,
final String path, final HttpOpParam.Op op, final long openOffset,
- final long blocksize,
+ final long blocksize, final String excludeDatanodes,
final Param<?, ?>... parameters) throws URISyntaxException, IOException {
final DatanodeInfo dn;
try {
- dn = chooseDatanode(namenode, path, op, openOffset, blocksize);
+ dn = chooseDatanode(namenode, path, op, openOffset, blocksize,
+ excludeDatanodes);
} catch (InvalidTopologyException ite) {
throw new IOException("Failed to find datanode, suggest to check cluster health.", ite);
}
@@ -333,12 +376,25 @@ public class NamenodeWebHdfsMethods {
@QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
final TokenArgumentParam delegationTokenArgument,
@QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT)
- final AclPermissionParam aclPermission
- )throws IOException, InterruptedException {
+ final AclPermissionParam aclPermission,
+ @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
+ final XAttrNameParam xattrName,
+ @QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT)
+ final XAttrValueParam xattrValue,
+ @QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT)
+ final XAttrSetFlagParam xattrSetFlag,
+ @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+ final SnapshotNameParam snapshotName,
+ @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
+ ) throws IOException, InterruptedException {
return put(ugi, delegation, username, doAsUser, ROOT, op, destination,
owner, group, permission, overwrite, bufferSize, replication,
blockSize, modificationTime, accessTime, renameOptions, createParent,
- delegationTokenArgument,aclPermission);
+ delegationTokenArgument, aclPermission, xattrName, xattrValue,
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
}
/** Handle HTTP PUT request. */
@@ -384,25 +440,39 @@ public class NamenodeWebHdfsMethods {
@QueryParam(TokenArgumentParam.NAME) @DefaultValue(TokenArgumentParam.DEFAULT)
final TokenArgumentParam delegationTokenArgument,
@QueryParam(AclPermissionParam.NAME) @DefaultValue(AclPermissionParam.DEFAULT)
- final AclPermissionParam aclPermission
+ final AclPermissionParam aclPermission,
+ @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
+ final XAttrNameParam xattrName,
+ @QueryParam(XAttrValueParam.NAME) @DefaultValue(XAttrValueParam.DEFAULT)
+ final XAttrValueParam xattrValue,
+ @QueryParam(XAttrSetFlagParam.NAME) @DefaultValue(XAttrSetFlagParam.DEFAULT)
+ final XAttrSetFlagParam xattrSetFlag,
+ @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+ final SnapshotNameParam snapshotName,
+ @QueryParam(OldSnapshotNameParam.NAME) @DefaultValue(OldSnapshotNameParam.DEFAULT)
+ final OldSnapshotNameParam oldSnapshotName,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
init(ugi, delegation, username, doAsUser, path, op, destination, owner,
group, permission, overwrite, bufferSize, replication, blockSize,
- modificationTime, accessTime, renameOptions, delegationTokenArgument,aclPermission);
+ modificationTime, accessTime, renameOptions, delegationTokenArgument,
+ aclPermission, xattrName, xattrValue, xattrSetFlag, snapshotName,
+ oldSnapshotName, excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
- REMOTE_ADDRESS.set(request.getRemoteAddr());
try {
return put(ugi, delegation, username, doAsUser,
path.getAbsolutePath(), op, destination, owner, group,
permission, overwrite, bufferSize, replication, blockSize,
modificationTime, accessTime, renameOptions, createParent,
- delegationTokenArgument,aclPermission);
+ delegationTokenArgument, aclPermission, xattrName, xattrValue,
+ xattrSetFlag, snapshotName, oldSnapshotName, excludeDatanodes);
} finally {
- REMOTE_ADDRESS.set(null);
+ reset();
}
}
});
@@ -428,7 +498,13 @@ public class NamenodeWebHdfsMethods {
final RenameOptionSetParam renameOptions,
final CreateParentParam createParent,
final TokenArgumentParam delegationTokenArgument,
- final AclPermissionParam aclPermission
+ final AclPermissionParam aclPermission,
+ final XAttrNameParam xattrName,
+ final XAttrValueParam xattrValue,
+ final XAttrSetFlagParam xattrSetFlag,
+ final SnapshotNameParam snapshotName,
+ final OldSnapshotNameParam oldSnapshotName,
+ final ExcludeDatanodesParam exclDatanodes
) throws IOException, URISyntaxException {
final Configuration conf = (Configuration)context.getAttribute(JspHelper.CURRENT_CONF);
@@ -438,9 +514,10 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case CREATE:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, blockSize.getValue(conf),
- permission, overwrite, bufferSize, replication, blockSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, blockSize.getValue(conf),
+ exclDatanodes.getValue(), permission, overwrite, bufferSize,
+ replication, blockSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case MKDIRS:
@@ -528,6 +605,28 @@ public class NamenodeWebHdfsMethods {
np.setAcl(fullpath, aclPermission.getAclPermission(true));
return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
}
+ case SETXATTR: {
+ np.setXAttr(
+ fullpath,
+ XAttrHelper.buildXAttr(xattrName.getXAttrName(),
+ xattrValue.getXAttrValue()), xattrSetFlag.getFlag());
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+ }
+ case REMOVEXATTR: {
+ np.removeXAttr(fullpath, XAttrHelper.buildXAttr(xattrName.getXAttrName()));
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+ }
+ case CREATESNAPSHOT: {
+ String snapshotPath = np.createSnapshot(fullpath, snapshotName.getValue());
+ final String js = JsonUtil.toJsonString(
+ org.apache.hadoop.fs.Path.class.getSimpleName(), snapshotPath);
+ return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+ }
+ case RENAMESNAPSHOT: {
+ np.renameSnapshot(fullpath, oldSnapshotName.getValue(),
+ snapshotName.getValue());
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+ }
default:
throw new UnsupportedOperationException(op + " is not supported");
}
@@ -551,9 +650,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs, bufferSize);
+ return post(ugi, delegation, username, doAsUser, ROOT, op, concatSrcs,
+ bufferSize, excludeDatanodes);
}
/** Handle HTTP POST request. */
@@ -575,20 +677,23 @@ public class NamenodeWebHdfsMethods {
@QueryParam(ConcatSourcesParam.NAME) @DefaultValue(ConcatSourcesParam.DEFAULT)
final ConcatSourcesParam concatSrcs,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, InterruptedException {
- init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize);
+ init(ugi, delegation, username, doAsUser, path, op, concatSrcs, bufferSize,
+ excludeDatanodes);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
- REMOTE_ADDRESS.set(request.getRemoteAddr());
try {
return post(ugi, delegation, username, doAsUser,
- path.getAbsolutePath(), op, concatSrcs, bufferSize);
+ path.getAbsolutePath(), op, concatSrcs, bufferSize,
+ excludeDatanodes);
} finally {
- REMOTE_ADDRESS.set(null);
+ reset();
}
}
});
@@ -602,15 +707,17 @@ public class NamenodeWebHdfsMethods {
final String fullpath,
final PostOpParam op,
final ConcatSourcesParam concatSrcs,
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ final ExcludeDatanodesParam excludeDatanodes
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
switch(op.getValue()) {
case APPEND:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), -1L, -1L,
+ excludeDatanodes.getValue(), bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case CONCAT:
@@ -644,10 +751,18 @@ public class NamenodeWebHdfsMethods {
@QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
final RenewerParam renewer,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
+ final List<XAttrNameParam> xattrNames,
+ @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes,
+ @QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
+ final FsActionParam fsAction
) throws IOException, InterruptedException {
- return get(ugi, delegation, username, doAsUser, ROOT, op,
- offset, length, renewer, bufferSize);
+ return get(ugi, delegation, username, doAsUser, ROOT, op, offset, length,
+ renewer, bufferSize, xattrNames, xattrEncoding, excludeDatanodes, fsAction);
}
/** Handle HTTP GET request. */
@@ -672,21 +787,29 @@ public class NamenodeWebHdfsMethods {
@QueryParam(RenewerParam.NAME) @DefaultValue(RenewerParam.DEFAULT)
final RenewerParam renewer,
@QueryParam(BufferSizeParam.NAME) @DefaultValue(BufferSizeParam.DEFAULT)
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ @QueryParam(XAttrNameParam.NAME) @DefaultValue(XAttrNameParam.DEFAULT)
+ final List<XAttrNameParam> xattrNames,
+ @QueryParam(XAttrEncodingParam.NAME) @DefaultValue(XAttrEncodingParam.DEFAULT)
+ final XAttrEncodingParam xattrEncoding,
+ @QueryParam(ExcludeDatanodesParam.NAME) @DefaultValue(ExcludeDatanodesParam.DEFAULT)
+ final ExcludeDatanodesParam excludeDatanodes,
+ @QueryParam(FsActionParam.NAME) @DefaultValue(FsActionParam.DEFAULT)
+ final FsActionParam fsAction
) throws IOException, InterruptedException {
- init(ugi, delegation, username, doAsUser, path, op,
- offset, length, renewer, bufferSize);
+ init(ugi, delegation, username, doAsUser, path, op, offset, length,
+ renewer, bufferSize, xattrEncoding, excludeDatanodes, fsAction);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException, URISyntaxException {
- REMOTE_ADDRESS.set(request.getRemoteAddr());
try {
return get(ugi, delegation, username, doAsUser,
- path.getAbsolutePath(), op, offset, length, renewer, bufferSize);
+ path.getAbsolutePath(), op, offset, length, renewer, bufferSize,
+ xattrNames, xattrEncoding, excludeDatanodes, fsAction);
} finally {
- REMOTE_ADDRESS.set(null);
+ reset();
}
}
});
@@ -702,7 +825,11 @@ public class NamenodeWebHdfsMethods {
final OffsetParam offset,
final LengthParam length,
final RenewerParam renewer,
- final BufferSizeParam bufferSize
+ final BufferSizeParam bufferSize,
+ final List<XAttrNameParam> xattrNames,
+ final XAttrEncodingParam xattrEncoding,
+ final ExcludeDatanodesParam excludeDatanodes,
+ final FsActionParam fsAction
) throws IOException, URISyntaxException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
final NamenodeProtocols np = getRPCServer(namenode);
@@ -710,8 +837,9 @@ public class NamenodeWebHdfsMethods {
switch(op.getValue()) {
case OPEN:
{
- final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), offset.getValue(), -1L, offset, length, bufferSize);
+ final URI uri = redirectURI(namenode, ugi, delegation, username,
+ doAsUser, fullpath, op.getValue(), offset.getValue(), -1L,
+ excludeDatanodes.getValue(), offset, length, bufferSize);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GET_BLOCK_LOCATIONS:
@@ -747,7 +875,7 @@ public class NamenodeWebHdfsMethods {
case GETFILECHECKSUM:
{
final URI uri = redirectURI(namenode, ugi, delegation, username, doAsUser,
- fullpath, op.getValue(), -1L, -1L);
+ fullpath, op.getValue(), -1L, -1L, null);
return Response.temporaryRedirect(uri).type(MediaType.APPLICATION_OCTET_STREAM).build();
}
case GETDELEGATIONTOKEN:
@@ -777,6 +905,31 @@ public class NamenodeWebHdfsMethods {
final String js = JsonUtil.toJsonString(status);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
+ case GETXATTRS: {
+ List<String> names = null;
+ if (xattrNames != null) {
+ names = Lists.newArrayListWithCapacity(xattrNames.size());
+ for (XAttrNameParam xattrName : xattrNames) {
+ if (xattrName.getXAttrName() != null) {
+ names.add(xattrName.getXAttrName());
+ }
+ }
+ }
+ List<XAttr> xAttrs = np.getXAttrs(fullpath, (names != null &&
+ !names.isEmpty()) ? XAttrHelper.buildXAttrs(names) : null);
+ final String js = JsonUtil.toJsonString(xAttrs,
+ xattrEncoding.getEncoding());
+ return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+ }
+ case LISTXATTRS: {
+ final List<XAttr> xAttrs = np.listXAttrs(fullpath);
+ final String js = JsonUtil.toJsonString(xAttrs);
+ return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
+ }
+ case CHECKACCESS: {
+ np.checkAccess(fullpath, FsAction.getFsAction(fsAction.getValue()));
+ return Response.ok().build();
+ }
default:
throw new UnsupportedOperationException(op + " is not supported");
}
@@ -860,9 +1013,12 @@ public class NamenodeWebHdfsMethods {
@QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
final DeleteOpParam op,
@QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
- final RecursiveParam recursive
+ final RecursiveParam recursive,
+ @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+ final SnapshotNameParam snapshotName
) throws IOException, InterruptedException {
- return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive);
+ return delete(ugi, delegation, username, doAsUser, ROOT, op, recursive,
+ snapshotName);
}
/** Handle HTTP DELETE request. */
@@ -881,20 +1037,21 @@ public class NamenodeWebHdfsMethods {
@QueryParam(DeleteOpParam.NAME) @DefaultValue(DeleteOpParam.DEFAULT)
final DeleteOpParam op,
@QueryParam(RecursiveParam.NAME) @DefaultValue(RecursiveParam.DEFAULT)
- final RecursiveParam recursive
+ final RecursiveParam recursive,
+ @QueryParam(SnapshotNameParam.NAME) @DefaultValue(SnapshotNameParam.DEFAULT)
+ final SnapshotNameParam snapshotName
) throws IOException, InterruptedException {
- init(ugi, delegation, username, doAsUser, path, op, recursive);
+ init(ugi, delegation, username, doAsUser, path, op, recursive, snapshotName);
return ugi.doAs(new PrivilegedExceptionAction<Response>() {
@Override
public Response run() throws IOException {
- REMOTE_ADDRESS.set(request.getRemoteAddr());
try {
return delete(ugi, delegation, username, doAsUser,
- path.getAbsolutePath(), op, recursive);
+ path.getAbsolutePath(), op, recursive, snapshotName);
} finally {
- REMOTE_ADDRESS.set(null);
+ reset();
}
}
});
@@ -907,17 +1064,22 @@ public class NamenodeWebHdfsMethods {
final DoAsParam doAsUser,
final String fullpath,
final DeleteOpParam op,
- final RecursiveParam recursive
+ final RecursiveParam recursive,
+ final SnapshotNameParam snapshotName
) throws IOException {
final NameNode namenode = (NameNode)context.getAttribute("name.node");
+ final NamenodeProtocols np = getRPCServer(namenode);
switch(op.getValue()) {
- case DELETE:
- {
- final boolean b = getRPCServer(namenode).delete(fullpath, recursive.getValue());
+ case DELETE: {
+ final boolean b = np.delete(fullpath, recursive.getValue());
final String js = JsonUtil.toJsonString("boolean", b);
return Response.ok(js).type(MediaType.APPLICATION_JSON).build();
}
+ case DELETESNAPSHOT: {
+ np.deleteSnapshot(fullpath, snapshotName.getValue());
+ return Response.ok().type(MediaType.APPLICATION_OCTET_STREAM).build();
+ }
default:
throw new UnsupportedOperationException(op + " is not supported");
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockCommand.java Tue Aug 19 23:49:39 2014
@@ -21,6 +21,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor.BlockTargetPair;
@@ -50,6 +51,7 @@ public class BlockCommand extends Datano
final String poolId;
final Block[] blocks;
final DatanodeInfo[][] targets;
+ final StorageType[][] targetStorageTypes;
final String[][] targetStorageIDs;
/**
@@ -62,17 +64,20 @@ public class BlockCommand extends Datano
this.poolId = poolId;
blocks = new Block[blocktargetlist.size()];
targets = new DatanodeInfo[blocks.length][];
+ targetStorageTypes = new StorageType[blocks.length][];
targetStorageIDs = new String[blocks.length][];
for(int i = 0; i < blocks.length; i++) {
BlockTargetPair p = blocktargetlist.get(i);
blocks[i] = p.block;
targets[i] = DatanodeStorageInfo.toDatanodeInfos(p.targets);
+ targetStorageTypes[i] = DatanodeStorageInfo.toStorageTypes(p.targets);
targetStorageIDs[i] = DatanodeStorageInfo.toStorageIDs(p.targets);
}
}
private static final DatanodeInfo[][] EMPTY_TARGET_DATANODES = {};
+ private static final StorageType[][] EMPTY_TARGET_STORAGE_TYPES = {};
private static final String[][] EMPTY_TARGET_STORAGEIDS = {};
/**
@@ -81,7 +86,7 @@ public class BlockCommand extends Datano
*/
public BlockCommand(int action, String poolId, Block blocks[]) {
this(action, poolId, blocks, EMPTY_TARGET_DATANODES,
- EMPTY_TARGET_STORAGEIDS);
+ EMPTY_TARGET_STORAGE_TYPES, EMPTY_TARGET_STORAGEIDS);
}
/**
@@ -89,11 +94,13 @@ public class BlockCommand extends Datano
* @param blocks blocks related to the action
*/
public BlockCommand(int action, String poolId, Block[] blocks,
- DatanodeInfo[][] targets, String[][] targetStorageIDs) {
+ DatanodeInfo[][] targets, StorageType[][] targetStorageTypes,
+ String[][] targetStorageIDs) {
super(action);
this.poolId = poolId;
this.blocks = blocks;
this.targets = targets;
+ this.targetStorageTypes = targetStorageTypes;
this.targetStorageIDs = targetStorageIDs;
}
@@ -109,6 +116,10 @@ public class BlockCommand extends Datano
return targets;
}
+ public StorageType[][] getTargetStorageTypes() {
+ return targetStorageTypes;
+ }
+
public String[][] getTargetStorageIDs() {
return targetStorageIDs;
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockIdCommand.java Tue Aug 19 23:49:39 2014
@@ -32,7 +32,6 @@ public class BlockIdCommand extends Data
/**
* Create BlockCommand for the given action
- * @param blocks blocks related to the action
*/
public BlockIdCommand(int action, String poolId, long[] blockIds) {
super(action);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlocksWithLocations.java Tue Aug 19 23:49:39 2014
@@ -17,10 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.protocol;
-import java.util.Arrays;
-
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block;
/**
@@ -39,12 +38,15 @@ public class BlocksWithLocations {
final Block block;
final String[] datanodeUuids;
final String[] storageIDs;
+ final StorageType[] storageTypes;
/** constructor */
- public BlockWithLocations(Block block, String[] datanodeUuids, String[] storageIDs) {
+ public BlockWithLocations(Block block, String[] datanodeUuids,
+ String[] storageIDs, StorageType[] storageTypes) {
this.block = block;
this.datanodeUuids = datanodeUuids;
this.storageIDs = storageIDs;
+ this.storageTypes = storageTypes;
}
/** get the block */
@@ -61,7 +63,12 @@ public class BlocksWithLocations {
public String[] getStorageIDs() {
return storageIDs;
}
-
+
+ /** @return the storage types */
+ public StorageType[] getStorageTypes() {
+ return storageTypes;
+ }
+
@Override
public String toString() {
final StringBuilder b = new StringBuilder();
@@ -70,12 +77,18 @@ public class BlocksWithLocations {
return b.append("[]").toString();
}
- b.append(storageIDs[0]).append('@').append(datanodeUuids[0]);
+ appendString(0, b.append("["));
for(int i = 1; i < datanodeUuids.length; i++) {
- b.append(", ").append(storageIDs[i]).append("@").append(datanodeUuids[i]);
+ appendString(i, b.append(","));
}
return b.append("]").toString();
}
+
+ private StringBuilder appendString(int i, StringBuilder b) {
+ return b.append("[").append(storageTypes[i]).append("]")
+ .append(storageIDs[i])
+ .append("@").append(datanodeUuids[i]);
+ }
}
private final BlockWithLocations[] blocks;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Tue Aug 19 23:49:39 2014
@@ -119,9 +119,9 @@ public interface DatanodeProtocol {
* and should be deleted. This function is meant to upload *all*
* the locally-stored blocks. It's invoked upon startup and then
* infrequently afterwards.
- * @param registration
- * @param poolId - the block pool ID for the blocks
- * @param reports - report of blocks per storage
+ * @param registration datanode registration
+ * @param poolId the block pool ID for the blocks
+ * @param reports report of blocks per storage
* Each finalized block is represented as 3 longs. Each under-
* construction replica is represented as 4 longs.
* This is done instead of Block[] to reduce memory used by block reports.
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeStorage.java Tue Aug 19 23:49:39 2014
@@ -48,8 +48,6 @@ public class DatanodeStorage {
/**
* Create a storage with {@link State#NORMAL} and {@link StorageType#DEFAULT}.
- *
- * @param storageID
*/
public DatanodeStorage(String storageID) {
this(storageID, State.NORMAL, StorageType.DEFAULT);
@@ -84,6 +82,11 @@ public class DatanodeStorage {
}
@Override
+ public String toString() {
+ return "DatanodeStorage["+ storageID + "," + storageType + "," + state +"]";
+ }
+
+ @Override
public boolean equals(Object other){
if (other == this) {
return true;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/NamenodeProtocols.java Tue Aug 19 23:49:39 2014
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.security.RefreshUserMappingsProtocol;
import org.apache.hadoop.ipc.RefreshCallQueueProtocol;
+import org.apache.hadoop.ipc.GenericRefreshProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
/** The full set of RPC methods implemented by the Namenode. */
@@ -35,6 +36,7 @@ public interface NamenodeProtocols
RefreshAuthorizationPolicyProtocol,
RefreshUserMappingsProtocol,
RefreshCallQueueProtocol,
+ GenericRefreshProtocol,
GetUserMappingsProtocol,
HAServiceProtocol {
}
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java Tue Aug 19 23:49:39 2014
@@ -22,6 +22,9 @@ import org.apache.hadoop.classification.
/**
* A BlockCommand is an instruction to a datanode to register with the namenode.
+ * This command can't be combined with other commands in the same response.
+ * This is because after the datanode processes RegisterCommand, it will skip
+ * the rest of the DatanodeCommands in the same HeartbeatResponse.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/ServerCommand.java Tue Aug 19 23:49:39 2014
@@ -39,7 +39,7 @@ public abstract class ServerCommand {
*
* @see DatanodeProtocol
* @see NamenodeProtocol
- * @param action
+ * @param action protocol specific action
*/
public ServerCommand(int action) {
this.action = action;
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java Tue Aug 19 23:49:39 2014
@@ -32,11 +32,16 @@ import com.google.common.base.Preconditi
* DfsClientShm is a subclass of ShortCircuitShm which is used by the
* DfsClient.
* When the UNIX domain socket associated with this shared memory segment
- * closes unexpectedly, we mark the slots inside this segment as stale.
- * ShortCircuitReplica objects that contain stale slots are themselves stale,
+ * closes unexpectedly, we mark the slots inside this segment as disconnected.
+ * ShortCircuitReplica objects that contain disconnected slots are stale,
* and will not be used to service new reads or mmap operations.
* However, in-progress read or mmap operations will continue to proceed.
* Once the last slot is deallocated, the segment can be safely munmapped.
+ *
+ * Slots may also become stale because the associated replica has been deleted
+ * on the DataNode. In this case, the DataNode will clear the 'valid' bit.
+ * The client will then see these slots as stale (see
+ * #{ShortCircuitReplica#isStale}).
*/
public class DfsClientShm extends ShortCircuitShm
implements DomainSocketWatcher.Handler {
@@ -58,7 +63,7 @@ public class DfsClientShm extends ShortC
*
* {@link DfsClientShm#handle} sets this to true.
*/
- private boolean stale = false;
+ private boolean disconnected = false;
DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager,
DomainPeer peer) throws IOException {
@@ -76,14 +81,14 @@ public class DfsClientShm extends ShortC
}
/**
- * Determine if the shared memory segment is stale.
+ * Determine if the shared memory segment is disconnected from the DataNode.
*
* This must be called with the DfsClientShmManager lock held.
*
* @return True if the shared memory segment is stale.
*/
- public synchronized boolean isStale() {
- return stale;
+ public synchronized boolean isDisconnected() {
+ return disconnected;
}
/**
@@ -97,8 +102,8 @@ public class DfsClientShm extends ShortC
public boolean handle(DomainSocket sock) {
manager.unregisterShm(getShmId());
synchronized (this) {
- Preconditions.checkState(!stale);
- stale = true;
+ Preconditions.checkState(!disconnected);
+ disconnected = true;
boolean hadSlots = false;
for (Iterator<Slot> iter = slotIterator(); iter.hasNext(); ) {
Slot slot = iter.next();
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java Tue Aug 19 23:49:39 2014
@@ -271,12 +271,12 @@ public class DfsClientShmManager impleme
loading = false;
finishedLoading.signalAll();
}
- if (shm.isStale()) {
+ if (shm.isDisconnected()) {
// If the peer closed immediately after the shared memory segment
// was created, the DomainSocketWatcher callback might already have
- // fired and marked the shm as stale. In this case, we obviously
- // don't want to add the SharedMemorySegment to our list of valid
- // not-full segments.
+ // fired and marked the shm as disconnected. In this case, we
+ // obviously don't want to add the SharedMemorySegment to our list
+ // of valid not-full segments.
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": the UNIX domain socket associated with " +
"this short-circuit memory closed before we could make " +
@@ -299,7 +299,7 @@ public class DfsClientShmManager impleme
void freeSlot(Slot slot) {
DfsClientShm shm = (DfsClientShm)slot.getShm();
shm.unregisterSlot(slot.getSlotIdx());
- if (shm.isStale()) {
+ if (shm.isDisconnected()) {
// Stale shared memory segments should not be tracked here.
Preconditions.checkState(!full.containsKey(shm.getShmId()));
Preconditions.checkState(!notFull.containsKey(shm.getShmId()));
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitCache.java Tue Aug 19 23:49:39 2014
@@ -111,7 +111,7 @@ public class ShortCircuitCache implement
Long evictionTimeNs = Long.valueOf(0);
while (true) {
Entry<Long, ShortCircuitReplica> entry =
- evictableMmapped.ceilingEntry(evictionTimeNs);
+ evictable.ceilingEntry(evictionTimeNs);
if (entry == null) break;
evictionTimeNs = entry.getKey();
long evictionTimeMs =
@@ -384,10 +384,6 @@ public class ShortCircuitCache implement
this.shmManager = shmManager;
}
- public long getMmapRetryTimeoutMs() {
- return mmapRetryTimeoutMs;
- }
-
public long getStaleThresholdMs() {
return staleThresholdMs;
}
@@ -437,11 +433,22 @@ public class ShortCircuitCache implement
void unref(ShortCircuitReplica replica) {
lock.lock();
try {
- // If the replica is stale, but we haven't purged it yet, let's do that.
- // It would be a shame to evict a non-stale replica so that we could put
- // a stale one into the cache.
- if ((!replica.purged) && replica.isStale()) {
- purge(replica);
+ // If the replica is stale or unusable, but we haven't purged it yet,
+ // let's do that. It would be a shame to evict a non-stale replica so
+ // that we could put a stale or unusable one into the cache.
+ if (!replica.purged) {
+ String purgeReason = null;
+ if (!replica.getDataStream().getChannel().isOpen()) {
+ purgeReason = "purging replica because its data channel is closed.";
+ } else if (!replica.getMetaStream().getChannel().isOpen()) {
+ purgeReason = "purging replica because its meta channel is closed.";
+ } else if (replica.isStale()) {
+ purgeReason = "purging replica because it is stale.";
+ }
+ if (purgeReason != null) {
+ LOG.debug(this + ": " + purgeReason);
+ purge(replica);
+ }
}
String addedString = "";
boolean shouldTrimEvictionMaps = false;
@@ -836,7 +843,7 @@ public class ShortCircuitCache implement
} else if (replica.mmapData instanceof Long) {
long lastAttemptTimeMs = (Long)replica.mmapData;
long delta = Time.monotonicNow() - lastAttemptTimeMs;
- if (delta < staleThresholdMs) {
+ if (delta < mmapRetryTimeoutMs) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": can't create client mmap for " +
replica + " because we failed to " +
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java Tue Aug 19 23:49:39 2014
@@ -306,6 +306,13 @@ public class ShortCircuitShm {
(slotAddress - baseAddress) / BYTES_PER_SLOT);
}
+ /**
+ * Clear the slot.
+ */
+ void clear() {
+ unsafe.putLongVolatile(null, this.slotAddress, 0);
+ }
+
private boolean isSet(long flag) {
long prev = unsafe.getLongVolatile(null, this.slotAddress);
return (prev & flag) != 0;
@@ -535,6 +542,7 @@ public class ShortCircuitShm {
}
allocatedSlots.set(idx, true);
Slot slot = new Slot(calculateSlotAddress(idx), blockId);
+ slot.clear();
slot.makeValid();
slots[idx] = slot;
if (LOG.isTraceEnabled()) {
@@ -583,7 +591,7 @@ public class ShortCircuitShm {
Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId);
if (!slot.isValid()) {
throw new InvalidRequestException(this + ": slot " + slotIdx +
- " has not been allocated.");
+ " is not marked as valid.");
}
slots[slotIdx] = slot;
allocatedSlots.set(slotIdx, true);
Modified: hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java?rev=1619012&r1=1619011&r2=1619012&view=diff
==============================================================================
--- hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java (original)
+++ hadoop/common/branches/HADOOP-10388/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/CacheAdmin.java Tue Aug 19 23:49:39 2014
@@ -40,7 +40,8 @@ import org.apache.hadoop.hdfs.protocol.C
import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
import org.apache.hadoop.hdfs.protocol.CachePoolStats;
-import org.apache.hadoop.hdfs.tools.TableListing.Justification;
+import org.apache.hadoop.tools.TableListing;
+import org.apache.hadoop.tools.TableListing.Justification;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Tool;
@@ -503,19 +504,21 @@ public class CacheAdmin extends Configur
@Override
public String getShortUsage() {
- return "[" + getName() + " [-stats] [-path <path>] [-pool <pool>]]\n";
+ return "[" + getName()
+ + " [-stats] [-path <path>] [-pool <pool>] [-id <id>]\n";
}
@Override
public String getLongUsage() {
TableListing listing = getOptionDescriptionListing();
+ listing.addRow("-stats", "List path-based cache directive statistics.");
listing.addRow("<path>", "List only " +
"cache directives with this path. " +
"Note that if there is a cache directive for <path> " +
"in a cache pool that we don't have read access for, it " +
"will not be listed.");
listing.addRow("<pool>", "List only path cache directives in that pool.");
- listing.addRow("-stats", "List path-based cache directive statistics.");
+ listing.addRow("<id>", "List the cache directive with this id.");
return getShortUsage() + "\n" +
"List cache directives.\n\n" +
listing.toString();
@@ -534,6 +537,10 @@ public class CacheAdmin extends Configur
builder.setPool(poolFilter);
}
boolean printStats = StringUtils.popOption("-stats", args);
+ String idFilter = StringUtils.popOptionWithArgument("-id", args);
+ if (idFilter != null) {
+ builder.setId(Long.parseLong(idFilter));
+ }
if (!args.isEmpty()) {
System.err.println("Can't understand argument: " + args.get(0));
return 1;