You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by br...@apache.org on 2018/08/20 06:56:35 UTC
[1/2] hadoop git commit: HDFS-13790. RBF: Move ClientProtocol APIs to
its own module. Contributed by Chao Sun.
Repository: hadoop
Updated Branches:
refs/heads/trunk 4aacbfff6 -> 6425ed27e
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6425ed27/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
index 29f32a6..fe54993 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterRpcServer.java
@@ -33,16 +33,12 @@ import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.EnumSet;
-import java.util.HashMap;
-import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
-import java.util.TreeMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.CryptoProtocolVersion;
@@ -54,7 +50,6 @@ import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.XAttr;
@@ -64,7 +59,6 @@ import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.AddBlockFlag;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.inotify.EventBatchList;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
@@ -93,7 +87,6 @@ import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
-import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
import org.apache.hadoop.hdfs.protocol.OpenFilesIterator.OpenFilesType;
import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
@@ -101,8 +94,8 @@ import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
-import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.NamenodeProtocolService;
+import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ClientNamenodeProtocol;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.NamenodeProtocolPB;
@@ -167,11 +160,6 @@ public class RouterRpcServer extends AbstractService
/** Configuration for the RPC server. */
private Configuration conf;
- /** Identifier for the super user. */
- private final String superUser;
- /** Identifier for the super group. */
- private final String superGroup;
-
/** Router using this RPC server. */
private final Router router;
@@ -199,11 +187,10 @@ public class RouterRpcServer extends AbstractService
// Modules implementing groups of RPC calls
/** Router Quota calls. */
private final Quota quotaCall;
- /** Erasure coding calls. */
- private final ErasureCoding erasureCoding;
/** NamenodeProtocol calls. */
private final RouterNamenodeProtocol nnProto;
-
+ /** ClientProtocol calls. */
+ private final RouterClientProtocol clientProto;
/**
* Construct a router RPC server.
@@ -223,12 +210,6 @@ public class RouterRpcServer extends AbstractService
this.namenodeResolver = nnResolver;
this.subclusterResolver = fileResolver;
- // User and group for reporting
- this.superUser = System.getProperty("user.name");
- this.superGroup = this.conf.get(
- DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
- DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
-
// RPC server settings
int handlerCount = this.conf.getInt(DFS_ROUTER_HANDLER_COUNT_KEY,
DFS_ROUTER_HANDLER_COUNT_DEFAULT);
@@ -315,8 +296,8 @@ public class RouterRpcServer extends AbstractService
// Initialize modules
this.quotaCall = new Quota(this.router, this);
- this.erasureCoding = new ErasureCoding(this);
this.nnProto = new RouterNamenodeProtocol(this);
+ this.clientProto = new RouterClientProtocol(conf, this);
}
@Override
@@ -371,6 +352,13 @@ public class RouterRpcServer extends AbstractService
}
/**
+ * Get the active namenode resolver
+ */
+ public ActiveNamenodeResolver getNamenodeResolver() {
+ return namenodeResolver;
+ }
+
+ /**
* Get the RPC monitor and metrics.
*
* @return RPC monitor and metrics.
@@ -411,7 +399,7 @@ public class RouterRpcServer extends AbstractService
* client requests.
* @throws UnsupportedOperationException If the operation is not supported.
*/
- protected void checkOperation(OperationCategory op, boolean supported)
+ void checkOperation(OperationCategory op, boolean supported)
throws StandbyException, UnsupportedOperationException {
checkOperation(op);
@@ -433,7 +421,7 @@ public class RouterRpcServer extends AbstractService
* @throws SafeModeException If the Router is in safe mode and cannot serve
* client requests.
*/
- protected void checkOperation(OperationCategory op)
+ void checkOperation(OperationCategory op)
throws StandbyException {
// Log the function we are currently calling.
if (rpcMonitor != null) {
@@ -464,58 +452,44 @@ public class RouterRpcServer extends AbstractService
}
}
+ /**
+ * Get the name of the method that is calling this function.
+ *
+ * @return Name of the method calling this function.
+ */
+ static String getMethodName() {
+ final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ String methodName = stack[3].getMethodName();
+ return methodName;
+ }
+
@Override // ClientProtocol
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
- checkOperation(OperationCategory.WRITE, false);
- return null;
- }
-
- /**
- * The the delegation token from each name service.
- * @param renewer
- * @return Name service -> Token.
- * @throws IOException
- */
- public Map<FederationNamespaceInfo, Token<DelegationTokenIdentifier>>
- getDelegationTokens(Text renewer) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
- return null;
+ return clientProto.getDelegationToken(renewer);
}
@Override // ClientProtocol
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
- checkOperation(OperationCategory.WRITE, false);
- return 0;
+ return clientProto.renewDelegationToken(token);
}
@Override // ClientProtocol
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.cancelDelegationToken(token);
}
@Override // ClientProtocol
public LocatedBlocks getBlockLocations(String src, final long offset,
final long length) throws IOException {
- checkOperation(OperationCategory.READ);
-
- List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations",
- new Class<?>[] {String.class, long.class, long.class},
- new RemoteParam(), offset, length);
- return (LocatedBlocks) rpcClient.invokeSequential(locations, remoteMethod,
- LocatedBlocks.class, null);
+ return clientProto.getBlockLocations(src, offset, length);
}
@Override // ClientProtocol
public FsServerDefaults getServerDefaults() throws IOException {
- checkOperation(OperationCategory.READ);
-
- RemoteMethod method = new RemoteMethod("getServerDefaults");
- String ns = subclusterResolver.getDefaultNamespace();
- return (FsServerDefaults) rpcClient.invokeSingle(ns, method);
+ return clientProto.getServerDefaults();
}
@Override // ClientProtocol
@@ -524,44 +498,8 @@ public class RouterRpcServer extends AbstractService
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- if (createParent && isPathAll(src)) {
- int index = src.lastIndexOf(Path.SEPARATOR);
- String parent = src.substring(0, index);
- LOG.debug("Creating {} requires creating parent {}", src, parent);
- FsPermission parentPermissions = getParentPermission(masked);
- boolean success = mkdirs(parent, parentPermissions, createParent);
- if (!success) {
- // This shouldn't happen as mkdirs returns true or exception
- LOG.error("Couldn't create parents for {}", src);
- }
- }
-
- RemoteLocation createLocation = getCreateLocation(src);
- RemoteMethod method = new RemoteMethod("create",
- new Class<?>[] {String.class, FsPermission.class, String.class,
- EnumSetWritable.class, boolean.class, short.class,
- long.class, CryptoProtocolVersion[].class,
- String.class},
- createLocation.getDest(), masked, clientName, flag, createParent,
+ return clientProto.create(src, masked, clientName, flag, createParent,
replication, blockSize, supportedVersions, ecPolicyName);
- return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
- }
-
- /**
- * Get the permissions for the parent of a child with given permissions.
- * Add implicit u+wx permission for parent. This is based on
- * @{FSDirMkdirOp#addImplicitUwx}.
- * @param mask The permission mask of the child.
- * @return The permission mask of the parent.
- */
- private static FsPermission getParentPermission(final FsPermission mask) {
- FsPermission ret = new FsPermission(
- mask.getUserAction().or(FsAction.WRITE_EXECUTE),
- mask.getGroupAction(),
- mask.getOtherAction());
- return ret;
}
/**
@@ -572,7 +510,7 @@ public class RouterRpcServer extends AbstractService
* @return The remote location for this file.
* @throws IOException If the file has no creation location.
*/
- protected RemoteLocation getCreateLocation(final String src)
+ RemoteLocation getCreateLocation(final String src)
throws IOException {
final List<RemoteLocation> locations = getLocationsForPath(src, true);
@@ -613,100 +551,45 @@ public class RouterRpcServer extends AbstractService
return createLocation;
}
- // Medium
@Override // ClientProtocol
public LastBlockWithStatus append(String src, final String clientName,
final EnumSetWritable<CreateFlag> flag) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("append",
- new Class<?>[] {String.class, String.class, EnumSetWritable.class},
- new RemoteParam(), clientName, flag);
- return rpcClient.invokeSequential(
- locations, method, LastBlockWithStatus.class, null);
+ return clientProto.append(src, clientName, flag);
}
- // Low
@Override // ClientProtocol
public boolean recoverLease(String src, String clientName)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("recoverLease",
- new Class<?>[] {String.class, String.class}, new RemoteParam(),
- clientName);
- Object result = rpcClient.invokeSequential(
- locations, method, Boolean.class, Boolean.TRUE);
- return (boolean) result;
+ return clientProto.recoverLease(src, clientName);
}
@Override // ClientProtocol
public boolean setReplication(String src, short replication)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("setReplication",
- new Class<?>[] {String.class, short.class}, new RemoteParam(),
- replication);
- Object result = rpcClient.invokeSequential(
- locations, method, Boolean.class, Boolean.TRUE);
- return (boolean) result;
+ return clientProto.setReplication(src, replication);
}
- @Override
+ @Override // ClientProtocol
public void setStoragePolicy(String src, String policyName)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("setStoragePolicy",
- new Class<?>[] {String.class, String.class},
- new RemoteParam(), policyName);
- rpcClient.invokeSequential(locations, method, null, null);
+ clientProto.setStoragePolicy(src, policyName);
}
- @Override
+ @Override // ClientProtocol
public BlockStoragePolicy[] getStoragePolicies() throws IOException {
- checkOperation(OperationCategory.READ);
-
- RemoteMethod method = new RemoteMethod("getStoragePolicies");
- String ns = subclusterResolver.getDefaultNamespace();
- return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
+ return clientProto.getStoragePolicies();
}
@Override // ClientProtocol
public void setPermission(String src, FsPermission permissions)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("setPermission",
- new Class<?>[] {String.class, FsPermission.class},
- new RemoteParam(), permissions);
- if (isPathAll(src)) {
- rpcClient.invokeConcurrent(locations, method);
- } else {
- rpcClient.invokeSequential(locations, method);
- }
+ clientProto.setPermission(src, permissions);
}
@Override // ClientProtocol
public void setOwner(String src, String username, String groupname)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("setOwner",
- new Class<?>[] {String.class, String.class, String.class},
- new RemoteParam(), username, groupname);
- if (isPathAll(src)) {
- rpcClient.invokeConcurrent(locations, method);
- } else {
- rpcClient.invokeSequential(locations, method);
- }
+ clientProto.setOwner(src, username, groupname);
}
/**
@@ -718,18 +601,8 @@ public class RouterRpcServer extends AbstractService
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("addBlock",
- new Class<?>[] {String.class, String.class, ExtendedBlock.class,
- DatanodeInfo[].class, long.class, String[].class,
- EnumSet.class},
- new RemoteParam(), clientName, previous, excludedNodes, fileId,
- favoredNodes, addBlockFlags);
- // TODO verify the excludedNodes and favoredNodes are acceptable to this NN
- return (LocatedBlock) rpcClient.invokeSequential(
- locations, method, LocatedBlock.class, null);
+ return clientProto.addBlock(src, clientName, previous, excludedNodes,
+ fileId, favoredNodes, addBlockFlags);
}
/**
@@ -742,55 +615,26 @@ public class RouterRpcServer extends AbstractService
final String[] existingStorageIDs, final DatanodeInfo[] excludes,
final int numAdditionalNodes, final String clientName)
throws IOException {
- checkOperation(OperationCategory.READ);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("getAdditionalDatanode",
- new Class<?>[] {String.class, long.class, ExtendedBlock.class,
- DatanodeInfo[].class, String[].class,
- DatanodeInfo[].class, int.class, String.class},
- new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes,
- numAdditionalNodes, clientName);
- return (LocatedBlock) rpcClient.invokeSequential(
- locations, method, LocatedBlock.class, null);
+ return clientProto.getAdditionalDatanode(src, fileId, blk, existings,
+ existingStorageIDs, excludes, numAdditionalNodes, clientName);
}
@Override // ClientProtocol
public void abandonBlock(ExtendedBlock b, long fileId, String src,
String holder) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- RemoteMethod method = new RemoteMethod("abandonBlock",
- new Class<?>[] {ExtendedBlock.class, long.class, String.class,
- String.class},
- b, fileId, new RemoteParam(), holder);
- rpcClient.invokeSingle(b, method);
+ clientProto.abandonBlock(b, fileId, src, holder);
}
@Override // ClientProtocol
public boolean complete(String src, String clientName, ExtendedBlock last,
long fileId) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("complete",
- new Class<?>[] {String.class, String.class, ExtendedBlock.class,
- long.class},
- new RemoteParam(), clientName, last, fileId);
- // Complete can return true/false, so don't expect a result
- return ((Boolean) rpcClient.invokeSequential(
- locations, method, Boolean.class, null)).booleanValue();
+ return clientProto.complete(src, clientName, last, fileId);
}
@Override // ClientProtocol
public LocatedBlock updateBlockForPipeline(
ExtendedBlock block, String clientName) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- RemoteMethod method = new RemoteMethod("updateBlockForPipeline",
- new Class<?>[] {ExtendedBlock.class, String.class},
- block, clientName);
- return (LocatedBlock) rpcClient.invokeSingle(block, method);
+ return clientProto.updateBlockForPipeline(block, clientName);
}
/**
@@ -801,462 +645,91 @@ public class RouterRpcServer extends AbstractService
public void updatePipeline(String clientName, ExtendedBlock oldBlock,
ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- RemoteMethod method = new RemoteMethod("updatePipeline",
- new Class<?>[] {String.class, ExtendedBlock.class, ExtendedBlock.class,
- DatanodeID[].class, String[].class},
- clientName, oldBlock, newBlock, newNodes, newStorageIDs);
- rpcClient.invokeSingle(oldBlock, method);
+ clientProto.updatePipeline(clientName, oldBlock, newBlock, newNodes,
+ newStorageIDs);
}
@Override // ClientProtocol
public long getPreferredBlockSize(String src) throws IOException {
- checkOperation(OperationCategory.READ);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("getPreferredBlockSize",
- new Class<?>[] {String.class}, new RemoteParam());
- return ((Long) rpcClient.invokeSequential(
- locations, method, Long.class, null)).longValue();
- }
-
- /**
- * Determines combinations of eligible src/dst locations for a rename. A
- * rename cannot change the namespace. Renames are only allowed if there is an
- * eligible dst location in the same namespace as the source.
- *
- * @param srcLocations List of all potential source destinations where the
- * path may be located. On return this list is trimmed to include
- * only the paths that have corresponding destinations in the same
- * namespace.
- * @param dst The destination path
- * @return A map of all eligible source namespaces and their corresponding
- * replacement value.
- * @throws IOException If the dst paths could not be determined.
- */
- private RemoteParam getRenameDestinations(
- final List<RemoteLocation> srcLocations, final String dst)
- throws IOException {
-
- final List<RemoteLocation> dstLocations = getLocationsForPath(dst, true);
- final Map<RemoteLocation, String> dstMap = new HashMap<>();
-
- Iterator<RemoteLocation> iterator = srcLocations.iterator();
- while (iterator.hasNext()) {
- RemoteLocation srcLocation = iterator.next();
- RemoteLocation eligibleDst =
- getFirstMatchingLocation(srcLocation, dstLocations);
- if (eligibleDst != null) {
- // Use this dst for this source location
- dstMap.put(srcLocation, eligibleDst.getDest());
- } else {
- // This src destination is not valid, remove from the source list
- iterator.remove();
- }
- }
- return new RemoteParam(dstMap);
- }
-
- /**
- * Get first matching location.
- *
- * @param location Location we are looking for.
- * @param locations List of locations.
- * @return The first matchin location in the list.
- */
- private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
- List<RemoteLocation> locations) {
- for (RemoteLocation loc : locations) {
- if (loc.getNameserviceId().equals(location.getNameserviceId())) {
- // Return first matching location
- return loc;
- }
- }
- return null;
+ return clientProto.getPreferredBlockSize(src);
}
@Deprecated
@Override // ClientProtocol
public boolean rename(final String src, final String dst)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> srcLocations =
- getLocationsForPath(src, true, false);
- // srcLocations may be trimmed by getRenameDestinations()
- final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
- RemoteParam dstParam = getRenameDestinations(locs, dst);
- if (locs.isEmpty()) {
- throw new IOException(
- "Rename of " + src + " to " + dst + " is not allowed," +
- " no eligible destination in the same namespace was found.");
- }
- RemoteMethod method = new RemoteMethod("rename",
- new Class<?>[] {String.class, String.class},
- new RemoteParam(), dstParam);
- return ((Boolean) rpcClient.invokeSequential(
- locs, method, Boolean.class, Boolean.TRUE)).booleanValue();
+ return clientProto.rename(src, dst);
}
@Override // ClientProtocol
public void rename2(final String src, final String dst,
final Options.Rename... options) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> srcLocations =
- getLocationsForPath(src, true, false);
- // srcLocations may be trimmed by getRenameDestinations()
- final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
- RemoteParam dstParam = getRenameDestinations(locs, dst);
- if (locs.isEmpty()) {
- throw new IOException(
- "Rename of " + src + " to " + dst + " is not allowed," +
- " no eligible destination in the same namespace was found.");
- }
- RemoteMethod method = new RemoteMethod("rename2",
- new Class<?>[] {String.class, String.class, options.getClass()},
- new RemoteParam(), dstParam, options);
- rpcClient.invokeSequential(locs, method, null, null);
+ clientProto.rename2(src, dst, options);
}
@Override // ClientProtocol
public void concat(String trg, String[] src) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // See if the src and target files are all in the same namespace
- LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1);
- if (targetBlocks == null) {
- throw new IOException("Cannot locate blocks for target file - " + trg);
- }
- LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock();
- String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId();
- for (String source : src) {
- LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1);
- if (sourceBlocks == null) {
- throw new IOException(
- "Cannot located blocks for source file " + source);
- }
- String sourceBlockPoolId =
- sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId();
- if (!sourceBlockPoolId.equals(targetBlockPoolId)) {
- throw new IOException("Cannot concatenate source file " + source
- + " because it is located in a different namespace"
- + " with block pool id " + sourceBlockPoolId
- + " from the target file with block pool id "
- + targetBlockPoolId);
- }
- }
-
- // Find locations in the matching namespace.
- final RemoteLocation targetDestination =
- getLocationForPath(trg, true, targetBlockPoolId);
- String[] sourceDestinations = new String[src.length];
- for (int i = 0; i < src.length; i++) {
- String sourceFile = src[i];
- RemoteLocation location =
- getLocationForPath(sourceFile, true, targetBlockPoolId);
- sourceDestinations[i] = location.getDest();
- }
- // Invoke
- RemoteMethod method = new RemoteMethod("concat",
- new Class<?>[] {String.class, String[].class},
- targetDestination.getDest(), sourceDestinations);
- rpcClient.invokeSingle(targetDestination, method);
+ clientProto.concat(trg, src);
}
@Override // ClientProtocol
public boolean truncate(String src, long newLength, String clientName)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("truncate",
- new Class<?>[] {String.class, long.class, String.class},
- new RemoteParam(), newLength, clientName);
- return ((Boolean) rpcClient.invokeSequential(locations, method,
- Boolean.class, Boolean.TRUE)).booleanValue();
+ return clientProto.truncate(src, newLength, clientName);
}
@Override // ClientProtocol
public boolean delete(String src, boolean recursive) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations =
- getLocationsForPath(src, true, false);
- RemoteMethod method = new RemoteMethod("delete",
- new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
- recursive);
- if (isPathAll(src)) {
- return rpcClient.invokeAll(locations, method);
- } else {
- return rpcClient.invokeSequential(locations, method,
- Boolean.class, Boolean.TRUE).booleanValue();
- }
+ return clientProto.delete(src, recursive);
}
@Override // ClientProtocol
public boolean mkdirs(String src, FsPermission masked, boolean createParent)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("mkdirs",
- new Class<?>[] {String.class, FsPermission.class, boolean.class},
- new RemoteParam(), masked, createParent);
-
- // Create in all locations
- if (isPathAll(src)) {
- return rpcClient.invokeAll(locations, method);
- }
-
- if (locations.size() > 1) {
- // Check if this directory already exists
- try {
- HdfsFileStatus fileStatus = getFileInfo(src);
- if (fileStatus != null) {
- // When existing, the NN doesn't return an exception; return true
- return true;
- }
- } catch (IOException ioe) {
- // Can't query if this file exists or not.
- LOG.error("Error requesting file info for path {} while proxing mkdirs",
- src, ioe);
- }
- }
-
- RemoteLocation firstLocation = locations.get(0);
- return ((Boolean) rpcClient.invokeSingle(firstLocation, method))
- .booleanValue();
+ return clientProto.mkdirs(src, masked, createParent);
}
@Override // ClientProtocol
public void renewLease(String clientName) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- RemoteMethod method = new RemoteMethod("renewLease",
- new Class<?>[] {String.class}, clientName);
- Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- rpcClient.invokeConcurrent(nss, method, false, false);
+ clientProto.renewLease(clientName);
}
@Override // ClientProtocol
public DirectoryListing getListing(String src, byte[] startAfter,
boolean needLocation) throws IOException {
- checkOperation(OperationCategory.READ);
-
- // Locate the dir and fetch the listing
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("getListing",
- new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
- new RemoteParam(), startAfter, needLocation);
- Map<RemoteLocation, DirectoryListing> listings =
- rpcClient.invokeConcurrent(
- locations, method, false, false, DirectoryListing.class);
-
- Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
- int totalRemainingEntries = 0;
- int remainingEntries = 0;
- boolean namenodeListingExists = false;
- if (listings != null) {
- // Check the subcluster listing with the smallest name
- String lastName = null;
- for (Entry<RemoteLocation, DirectoryListing> entry :
- listings.entrySet()) {
- RemoteLocation location = entry.getKey();
- DirectoryListing listing = entry.getValue();
- if (listing == null) {
- LOG.debug("Cannot get listing from {}", location);
- } else {
- totalRemainingEntries += listing.getRemainingEntries();
- HdfsFileStatus[] partialListing = listing.getPartialListing();
- int length = partialListing.length;
- if (length > 0) {
- HdfsFileStatus lastLocalEntry = partialListing[length-1];
- String lastLocalName = lastLocalEntry.getLocalName();
- if (lastName == null || lastName.compareTo(lastLocalName) > 0) {
- lastName = lastLocalName;
- }
- }
- }
- }
-
- // Add existing entries
- for (Object value : listings.values()) {
- DirectoryListing listing = (DirectoryListing) value;
- if (listing != null) {
- namenodeListingExists = true;
- for (HdfsFileStatus file : listing.getPartialListing()) {
- String filename = file.getLocalName();
- if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) {
- // Discarding entries further than the lastName
- remainingEntries++;
- } else {
- nnListing.put(filename, file);
- }
- }
- remainingEntries += listing.getRemainingEntries();
- }
- }
- }
-
- // Add mount points at this level in the tree
- final List<String> children = subclusterResolver.getMountPoints(src);
- if (children != null) {
- // Get the dates for each mount point
- Map<String, Long> dates = getMountPointDates(src);
-
- // Create virtual folder with the mount name
- for (String child : children) {
- long date = 0;
- if (dates != null && dates.containsKey(child)) {
- date = dates.get(child);
- }
- // TODO add number of children
- HdfsFileStatus dirStatus = getMountPointStatus(child, 0, date);
-
- // This may overwrite existing listing entries with the mount point
- // TODO don't add if already there?
- nnListing.put(child, dirStatus);
- }
- }
-
- if (!namenodeListingExists && nnListing.size() == 0) {
- // NN returns a null object if the directory cannot be found and has no
- // listing. If we didn't retrieve any NN listing data, and there are no
- // mount points here, return null.
- return null;
- }
-
- // Generate combined listing
- HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
- combinedData = nnListing.values().toArray(combinedData);
- return new DirectoryListing(combinedData, remainingEntries);
+ return clientProto.getListing(src, startAfter, needLocation);
}
@Override // ClientProtocol
public HdfsFileStatus getFileInfo(String src) throws IOException {
- checkOperation(OperationCategory.READ);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("getFileInfo",
- new Class<?>[] {String.class}, new RemoteParam());
-
- HdfsFileStatus ret = null;
- // If it's a directory, we check in all locations
- if (isPathAll(src)) {
- ret = getFileInfoAll(locations, method);
- } else {
- // Check for file information sequentially
- ret = (HdfsFileStatus) rpcClient.invokeSequential(
- locations, method, HdfsFileStatus.class, null);
- }
-
- // If there is no real path, check mount points
- if (ret == null) {
- List<String> children = subclusterResolver.getMountPoints(src);
- if (children != null && !children.isEmpty()) {
- Map<String, Long> dates = getMountPointDates(src);
- long date = 0;
- if (dates != null && dates.containsKey(src)) {
- date = dates.get(src);
- }
- ret = getMountPointStatus(src, children.size(), date);
- }
- }
-
- return ret;
- }
-
- /**
- * Get the file info from all the locations.
- *
- * @param locations Locations to check.
- * @param method The file information method to run.
- * @return The first file info if it's a file, the directory if it's
- * everywhere.
- * @throws IOException If all the locations throw an exception.
- */
- private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
- final RemoteMethod method) throws IOException {
-
- // Get the file info from everybody
- Map<RemoteLocation, HdfsFileStatus> results =
- rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
-
- // We return the first file
- HdfsFileStatus dirStatus = null;
- for (RemoteLocation loc : locations) {
- HdfsFileStatus fileStatus = results.get(loc);
- if (fileStatus != null) {
- if (!fileStatus.isDirectory()) {
- return fileStatus;
- } else if (dirStatus == null) {
- dirStatus = fileStatus;
- }
- }
- }
- return dirStatus;
+ return clientProto.getFileInfo(src);
}
@Override // ClientProtocol
public boolean isFileClosed(String src) throws IOException {
- checkOperation(OperationCategory.READ);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("isFileClosed",
- new Class<?>[] {String.class}, new RemoteParam());
- return ((Boolean) rpcClient.invokeSequential(
- locations, method, Boolean.class, Boolean.TRUE)).booleanValue();
+ return clientProto.isFileClosed(src);
}
@Override // ClientProtocol
public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
- checkOperation(OperationCategory.READ);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("getFileLinkInfo",
- new Class<?>[] {String.class}, new RemoteParam());
- return (HdfsFileStatus) rpcClient.invokeSequential(
- locations, method, HdfsFileStatus.class, null);
+ return clientProto.getFileLinkInfo(src);
}
- @Override
+ @Override // ClientProtocol
public HdfsLocatedFileStatus getLocatedFileInfo(String src,
boolean needBlockToken) throws IOException {
- checkOperation(OperationCategory.READ);
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("getLocatedFileInfo",
- new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
- Boolean.valueOf(needBlockToken));
- return (HdfsLocatedFileStatus) rpcClient.invokeSequential(
- locations, method, HdfsFileStatus.class, null);
+ return clientProto.getLocatedFileInfo(src, needBlockToken);
}
@Override // ClientProtocol
public long[] getStats() throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- RemoteMethod method = new RemoteMethod("getStats");
- Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- Map<FederationNamespaceInfo, long[]> results =
- rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
- long[] combinedData = new long[STATS_ARRAY_LENGTH];
- for (long[] data : results.values()) {
- for (int i = 0; i < combinedData.length && i < data.length; i++) {
- if (data[i] >= 0) {
- combinedData[i] += data[i];
- }
- }
- }
- return combinedData;
+ return clientProto.getStats();
}
@Override // ClientProtocol
public DatanodeInfo[] getDatanodeReport(DatanodeReportType type)
throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
- return getDatanodeReport(type, true, 0);
+ return clientProto.getDatanodeReport(type);
}
/**
@@ -1305,29 +778,7 @@ public class RouterRpcServer extends AbstractService
@Override // ClientProtocol
public DatanodeStorageReport[] getDatanodeStorageReport(
DatanodeReportType type) throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- Map<String, DatanodeStorageReport[]> dnSubcluster =
- getDatanodeStorageReportMap(type);
-
- // Avoid repeating machines in multiple subclusters
- Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
- for (DatanodeStorageReport[] dns : dnSubcluster.values()) {
- for (DatanodeStorageReport dn : dns) {
- DatanodeInfo dnInfo = dn.getDatanodeInfo();
- String nodeId = dnInfo.getXferAddr();
- if (!datanodesMap.containsKey(nodeId)) {
- datanodesMap.put(nodeId, dn);
- }
- // TODO merge somehow, right now it just takes the first one
- }
- }
-
- Collection<DatanodeStorageReport> datanodes = datanodesMap.values();
- DatanodeStorageReport[] combinedData =
- new DatanodeStorageReport[datanodes.size()];
- combinedData = datanodes.toArray(combinedData);
- return combinedData;
+ return clientProto.getDatanodeStorageReport(type);
}
/**
@@ -1360,740 +811,388 @@ public class RouterRpcServer extends AbstractService
@Override // ClientProtocol
public boolean setSafeMode(SafeModeAction action, boolean isChecked)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // Set safe mode in all the name spaces
- RemoteMethod method = new RemoteMethod("setSafeMode",
- new Class<?>[] {SafeModeAction.class, boolean.class},
- action, isChecked);
- Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- Map<FederationNamespaceInfo, Boolean> results =
- rpcClient.invokeConcurrent(
- nss, method, true, !isChecked, Boolean.class);
-
- // We only report true if all the name space are in safe mode
- int numSafemode = 0;
- for (boolean safemode : results.values()) {
- if (safemode) {
- numSafemode++;
- }
- }
- return numSafemode == results.size();
+ return clientProto.setSafeMode(action, isChecked);
}
@Override // ClientProtocol
public boolean restoreFailedStorage(String arg) throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- RemoteMethod method = new RemoteMethod("restoreFailedStorage",
- new Class<?>[] {String.class}, arg);
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- Map<FederationNamespaceInfo, Boolean> ret =
- rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
-
- boolean success = true;
- for (boolean s : ret.values()) {
- if (!s) {
- success = false;
- break;
- }
- }
- return success;
+ return clientProto.restoreFailedStorage(arg);
}
@Override // ClientProtocol
public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- RemoteMethod method = new RemoteMethod("saveNamespace",
- new Class<?>[] {Long.class, Long.class}, timeWindow, txGap);
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- Map<FederationNamespaceInfo, Boolean> ret =
- rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
-
- boolean success = true;
- for (boolean s : ret.values()) {
- if (!s) {
- success = false;
- break;
- }
- }
- return success;
+ return clientProto.saveNamespace(timeWindow, txGap);
}
@Override // ClientProtocol
public long rollEdits() throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- Map<FederationNamespaceInfo, Long> ret =
- rpcClient.invokeConcurrent(nss, method, true, false, long.class);
-
- // Return the maximum txid
- long txid = 0;
- for (long t : ret.values()) {
- if (t > txid) {
- txid = t;
- }
- }
- return txid;
+ return clientProto.rollEdits();
}
@Override // ClientProtocol
public void refreshNodes() throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- RemoteMethod method = new RemoteMethod("refreshNodes", new Class<?>[] {});
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- rpcClient.invokeConcurrent(nss, method, true, true);
+ clientProto.refreshNodes();
}
@Override // ClientProtocol
public void finalizeUpgrade() throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- RemoteMethod method = new RemoteMethod("finalizeUpgrade",
- new Class<?>[] {});
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- rpcClient.invokeConcurrent(nss, method, true, false);
+ clientProto.finalizeUpgrade();
}
@Override // ClientProtocol
public boolean upgradeStatus() throws IOException {
- String methodName = getMethodName();
- throw new UnsupportedOperationException(
- "Operation \"" + methodName + "\" is not supported");
+ return clientProto.upgradeStatus();
}
@Override // ClientProtocol
public RollingUpgradeInfo rollingUpgrade(RollingUpgradeAction action)
throws IOException {
- checkOperation(OperationCategory.READ);
-
- RemoteMethod method = new RemoteMethod("rollingUpgrade",
- new Class<?>[] {RollingUpgradeAction.class}, action);
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
- rpcClient.invokeConcurrent(
- nss, method, true, false, RollingUpgradeInfo.class);
-
- // Return the first rolling upgrade info
- RollingUpgradeInfo info = null;
- for (RollingUpgradeInfo infoNs : ret.values()) {
- if (info == null && infoNs != null) {
- info = infoNs;
- }
- }
- return info;
+ return clientProto.rollingUpgrade(action);
}
@Override // ClientProtocol
public void metaSave(String filename) throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- RemoteMethod method = new RemoteMethod("metaSave",
- new Class<?>[] {String.class}, filename);
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- rpcClient.invokeConcurrent(nss, method, true, false);
+ clientProto.metaSave(filename);
}
@Override // ClientProtocol
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException {
- checkOperation(OperationCategory.READ);
-
- final List<RemoteLocation> locations = getLocationsForPath(path, false);
- RemoteMethod method = new RemoteMethod("listCorruptFileBlocks",
- new Class<?>[] {String.class, String.class},
- new RemoteParam(), cookie);
- return (CorruptFileBlocks) rpcClient.invokeSequential(
- locations, method, CorruptFileBlocks.class, null);
+ return clientProto.listCorruptFileBlocks(path, cookie);
}
@Override // ClientProtocol
public void setBalancerBandwidth(long bandwidth) throws IOException {
- checkOperation(OperationCategory.UNCHECKED);
-
- RemoteMethod method = new RemoteMethod("setBalancerBandwidth",
- new Class<?>[] {Long.class}, bandwidth);
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- rpcClient.invokeConcurrent(nss, method, true, false);
+ clientProto.setBalancerBandwidth(bandwidth);
}
@Override // ClientProtocol
public ContentSummary getContentSummary(String path) throws IOException {
- checkOperation(OperationCategory.READ);
-
- // Get the summaries from regular files
- Collection<ContentSummary> summaries = new LinkedList<>();
- FileNotFoundException notFoundException = null;
- try {
- final List<RemoteLocation> locations = getLocationsForPath(path, false);
- RemoteMethod method = new RemoteMethod("getContentSummary",
- new Class<?>[] {String.class}, new RemoteParam());
- Map<RemoteLocation, ContentSummary> results =
- rpcClient.invokeConcurrent(
- locations, method, false, false, ContentSummary.class);
- summaries.addAll(results.values());
- } catch (FileNotFoundException e) {
- notFoundException = e;
- }
-
- // Add mount points at this level in the tree
- final List<String> children = subclusterResolver.getMountPoints(path);
- if (children != null) {
- for (String child : children) {
- Path childPath = new Path(path, child);
- try {
- ContentSummary mountSummary = getContentSummary(childPath.toString());
- if (mountSummary != null) {
- summaries.add(mountSummary);
- }
- } catch (Exception e) {
- LOG.error("Cannot get content summary for mount {}: {}",
- childPath, e.getMessage());
- }
- }
- }
-
- // Throw original exception if no original nor mount points
- if (summaries.isEmpty() && notFoundException != null) {
- throw notFoundException;
- }
-
- return aggregateContentSummary(summaries);
- }
-
- /**
- * Aggregate content summaries for each subcluster.
- *
- * @param summaries Collection of individual summaries.
- * @return Aggregated content summary.
- */
- private ContentSummary aggregateContentSummary(
- Collection<ContentSummary> summaries) {
- if (summaries.size() == 1) {
- return summaries.iterator().next();
- }
-
- long length = 0;
- long fileCount = 0;
- long directoryCount = 0;
- long quota = 0;
- long spaceConsumed = 0;
- long spaceQuota = 0;
-
- for (ContentSummary summary : summaries) {
- length += summary.getLength();
- fileCount += summary.getFileCount();
- directoryCount += summary.getDirectoryCount();
- quota += summary.getQuota();
- spaceConsumed += summary.getSpaceConsumed();
- spaceQuota += summary.getSpaceQuota();
- }
-
- ContentSummary ret = new ContentSummary.Builder()
- .length(length)
- .fileCount(fileCount)
- .directoryCount(directoryCount)
- .quota(quota)
- .spaceConsumed(spaceConsumed)
- .spaceQuota(spaceQuota)
- .build();
- return ret;
+ return clientProto.getContentSummary(path);
}
@Override // ClientProtocol
public void fsync(String src, long fileId, String clientName,
long lastBlockLength) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("fsync",
- new Class<?>[] {String.class, long.class, String.class, long.class },
- new RemoteParam(), fileId, clientName, lastBlockLength);
- rpcClient.invokeSequential(locations, method);
+ clientProto.fsync(src, fileId, clientName, lastBlockLength);
}
@Override // ClientProtocol
public void setTimes(String src, long mtime, long atime) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("setTimes",
- new Class<?>[] {String.class, long.class, long.class},
- new RemoteParam(), mtime, atime);
- rpcClient.invokeSequential(locations, method);
+ clientProto.setTimes(src, mtime, atime);
}
@Override // ClientProtocol
public void createSymlink(String target, String link, FsPermission dirPerms,
boolean createParent) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO Verify that the link location is in the same NS as the targets
- final List<RemoteLocation> targetLocations =
- getLocationsForPath(target, true);
- final List<RemoteLocation> linkLocations =
- getLocationsForPath(link, true);
- RemoteLocation linkLocation = linkLocations.get(0);
- RemoteMethod method = new RemoteMethod("createSymlink",
- new Class<?>[] {String.class, String.class, FsPermission.class,
- boolean.class},
- new RemoteParam(), linkLocation.getDest(), dirPerms, createParent);
- rpcClient.invokeSequential(targetLocations, method);
+ clientProto.createSymlink(target, link, dirPerms, createParent);
}
@Override // ClientProtocol
public String getLinkTarget(String path) throws IOException {
- checkOperation(OperationCategory.READ);
-
- final List<RemoteLocation> locations = getLocationsForPath(path, true);
- RemoteMethod method = new RemoteMethod("getLinkTarget",
- new Class<?>[] {String.class}, new RemoteParam());
- return (String) rpcClient.invokeSequential(
- locations, method, String.class, null);
+ return clientProto.getLinkTarget(path);
}
@Override // Client Protocol
public void allowSnapshot(String snapshotRoot) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.allowSnapshot(snapshotRoot);
}
@Override // Client Protocol
public void disallowSnapshot(String snapshot) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.disallowSnapshot(snapshot);
}
@Override // ClientProtocol
public void renameSnapshot(String snapshotRoot, String snapshotOldName,
String snapshotNewName) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.renameSnapshot(snapshotRoot, snapshotOldName, snapshotNewName);
}
@Override // Client Protocol
public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.getSnapshottableDirListing();
}
@Override // ClientProtocol
public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
String earlierSnapshotName, String laterSnapshotName) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.getSnapshotDiffReport(
+ snapshotRoot, earlierSnapshotName, laterSnapshotName);
}
@Override // ClientProtocol
public SnapshotDiffReportListing getSnapshotDiffReportListing(
String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
byte[] startPath, int index) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.getSnapshotDiffReportListing(snapshotRoot,
+ earlierSnapshotName, laterSnapshotName, startPath, index);
}
@Override // ClientProtocol
public long addCacheDirective(CacheDirectiveInfo path,
EnumSet<CacheFlag> flags) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
- return 0;
+ return clientProto.addCacheDirective(path, flags);
}
@Override // ClientProtocol
public void modifyCacheDirective(CacheDirectiveInfo directive,
EnumSet<CacheFlag> flags) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.modifyCacheDirective(directive, flags);
}
@Override // ClientProtocol
public void removeCacheDirective(long id) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.removeCacheDirective(id);
}
@Override // ClientProtocol
public BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
long prevId, CacheDirectiveInfo filter) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.listCacheDirectives(prevId, filter);
}
@Override // ClientProtocol
public void addCachePool(CachePoolInfo info) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.addCachePool(info);
}
@Override // ClientProtocol
public void modifyCachePool(CachePoolInfo info) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.modifyCachePool(info);
}
@Override // ClientProtocol
public void removeCachePool(String cachePoolName) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.removeCachePool(cachePoolName);
}
@Override // ClientProtocol
public BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.listCachePools(prevKey);
}
@Override // ClientProtocol
public void modifyAclEntries(String src, List<AclEntry> aclSpec)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("modifyAclEntries",
- new Class<?>[] {String.class, List.class},
- new RemoteParam(), aclSpec);
- rpcClient.invokeSequential(locations, method, null, null);
+ clientProto.modifyAclEntries(src, aclSpec);
}
@Override // ClienProtocol
public void removeAclEntries(String src, List<AclEntry> aclSpec)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("removeAclEntries",
- new Class<?>[] {String.class, List.class},
- new RemoteParam(), aclSpec);
- rpcClient.invokeSequential(locations, method, null, null);
+ clientProto.removeAclEntries(src, aclSpec);
}
@Override // ClientProtocol
public void removeDefaultAcl(String src) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("removeDefaultAcl",
- new Class<?>[] {String.class}, new RemoteParam());
- rpcClient.invokeSequential(locations, method);
+ clientProto.removeDefaultAcl(src);
}
@Override // ClientProtocol
public void removeAcl(String src) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("removeAcl",
- new Class<?>[] {String.class}, new RemoteParam());
- rpcClient.invokeSequential(locations, method);
+ clientProto.removeAcl(src);
}
@Override // ClientProtocol
public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod(
- "setAcl", new Class<?>[] {String.class, List.class},
- new RemoteParam(), aclSpec);
- rpcClient.invokeSequential(locations, method);
+ clientProto.setAcl(src, aclSpec);
}
@Override // ClientProtocol
public AclStatus getAclStatus(String src) throws IOException {
- checkOperation(OperationCategory.READ);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("getAclStatus",
- new Class<?>[] {String.class}, new RemoteParam());
- return (AclStatus) rpcClient.invokeSequential(
- locations, method, AclStatus.class, null);
+ return clientProto.getAclStatus(src);
}
@Override // ClientProtocol
public void createEncryptionZone(String src, String keyName)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("createEncryptionZone",
- new Class<?>[] {String.class, String.class},
- new RemoteParam(), keyName);
- rpcClient.invokeSequential(locations, method);
+ clientProto.createEncryptionZone(src, keyName);
}
@Override // ClientProtocol
public EncryptionZone getEZForPath(String src) throws IOException {
- checkOperation(OperationCategory.READ);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("getEZForPath",
- new Class<?>[] {String.class}, new RemoteParam());
- return (EncryptionZone) rpcClient.invokeSequential(
- locations, method, EncryptionZone.class, null);
+ return clientProto.getEZForPath(src);
}
@Override // ClientProtocol
public BatchedEntries<EncryptionZone> listEncryptionZones(long prevId)
throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.listEncryptionZones(prevId);
}
@Override // ClientProtocol
public void reencryptEncryptionZone(String zone, ReencryptAction action)
throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.reencryptEncryptionZone(zone, action);
}
@Override // ClientProtocol
public BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
long prevId) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.listReencryptionStatus(prevId);
}
@Override // ClientProtocol
public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("setXAttr",
- new Class<?>[] {String.class, XAttr.class, EnumSet.class},
- new RemoteParam(), xAttr, flag);
- rpcClient.invokeSequential(locations, method);
+ clientProto.setXAttr(src, xAttr, flag);
}
- @SuppressWarnings("unchecked")
@Override // ClientProtocol
public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
throws IOException {
- checkOperation(OperationCategory.READ);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("getXAttrs",
- new Class<?>[] {String.class, List.class}, new RemoteParam(), xAttrs);
- return (List<XAttr>) rpcClient.invokeSequential(
- locations, method, List.class, null);
+ return clientProto.getXAttrs(src, xAttrs);
}
- @SuppressWarnings("unchecked")
@Override // ClientProtocol
public List<XAttr> listXAttrs(String src) throws IOException {
- checkOperation(OperationCategory.READ);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, false);
- RemoteMethod method = new RemoteMethod("listXAttrs",
- new Class<?>[] {String.class}, new RemoteParam());
- return (List<XAttr>) rpcClient.invokeSequential(
- locations, method, List.class, null);
+ return clientProto.listXAttrs(src);
}
@Override // ClientProtocol
public void removeXAttr(String src, XAttr xAttr) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(src, true);
- RemoteMethod method = new RemoteMethod("removeXAttr",
- new Class<?>[] {String.class, XAttr.class}, new RemoteParam(), xAttr);
- rpcClient.invokeSequential(locations, method);
+ clientProto.removeXAttr(src, xAttr);
}
@Override // ClientProtocol
public void checkAccess(String path, FsAction mode) throws IOException {
- checkOperation(OperationCategory.READ);
-
- // TODO handle virtual directories
- final List<RemoteLocation> locations = getLocationsForPath(path, true);
- RemoteMethod method = new RemoteMethod("checkAccess",
- new Class<?>[] {String.class, FsAction.class},
- new RemoteParam(), mode);
- rpcClient.invokeSequential(locations, method);
+ clientProto.checkAccess(path, mode);
}
@Override // ClientProtocol
public long getCurrentEditLogTxid() throws IOException {
- checkOperation(OperationCategory.READ);
-
- RemoteMethod method = new RemoteMethod(
- "getCurrentEditLogTxid", new Class<?>[] {});
- final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
- Map<FederationNamespaceInfo, Long> ret =
- rpcClient.invokeConcurrent(nss, method, true, false, long.class);
-
- // Return the maximum txid
- long txid = 0;
- for (long t : ret.values()) {
- if (t > txid) {
- txid = t;
- }
- }
- return txid;
+ return clientProto.getCurrentEditLogTxid();
}
@Override // ClientProtocol
public EventBatchList getEditsFromTxid(long txid) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.getEditsFromTxid(txid);
}
- @Override
+ @Override // ClientProtocol
public DataEncryptionKey getDataEncryptionKey() throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.getDataEncryptionKey();
}
- @Override
+ @Override // ClientProtocol
public String createSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
- checkOperation(OperationCategory.WRITE);
- return null;
+ return clientProto.createSnapshot(snapshotRoot, snapshotName);
}
- @Override
+ @Override // ClientProtocol
public void deleteSnapshot(String snapshotRoot, String snapshotName)
throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.deleteSnapshot(snapshotRoot, snapshotName);
}
@Override // ClientProtocol
public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
StorageType type) throws IOException {
- this.quotaCall.setQuota(path, namespaceQuota, storagespaceQuota, type);
+ clientProto.setQuota(path, namespaceQuota, storagespaceQuota, type);
}
@Override // ClientProtocol
public QuotaUsage getQuotaUsage(String path) throws IOException {
- return this.quotaCall.getQuotaUsage(path);
+ return clientProto.getQuotaUsage(path);
}
- @Override
+ @Override // ClientProtocol
public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
- checkOperation(OperationCategory.WRITE);
-
- // Block pool id -> blocks
- Map<String, List<LocatedBlock>> blockLocations = new HashMap<>();
- for (LocatedBlock block : blocks) {
- String bpId = block.getBlock().getBlockPoolId();
- List<LocatedBlock> bpBlocks = blockLocations.get(bpId);
- if (bpBlocks == null) {
- bpBlocks = new LinkedList<>();
- blockLocations.put(bpId, bpBlocks);
- }
- bpBlocks.add(block);
- }
-
- // Invoke each block pool
- for (Entry<String, List<LocatedBlock>> entry : blockLocations.entrySet()) {
- String bpId = entry.getKey();
- List<LocatedBlock> bpBlocks = entry.getValue();
-
- LocatedBlock[] bpBlocksArray =
- bpBlocks.toArray(new LocatedBlock[bpBlocks.size()]);
- RemoteMethod method = new RemoteMethod("reportBadBlocks",
- new Class<?>[] {LocatedBlock[].class},
- new Object[] {bpBlocksArray});
- rpcClient.invokeSingleBlockPool(bpId, method);
- }
+ clientProto.reportBadBlocks(blocks);
}
- @Override
+ @Override // ClientProtocol
public void unsetStoragePolicy(String src) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
+ clientProto.unsetStoragePolicy(src);
}
- @Override
+ @Override // ClientProtocol
public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.getStoragePolicy(path);
}
@Override // ClientProtocol
public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
throws IOException {
- return erasureCoding.getErasureCodingPolicies();
+ return clientProto.getErasureCodingPolicies();
}
@Override // ClientProtocol
public Map<String, String> getErasureCodingCodecs() throws IOException {
- return erasureCoding.getErasureCodingCodecs();
+ return clientProto.getErasureCodingCodecs();
}
@Override // ClientProtocol
public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
ErasureCodingPolicy[] policies) throws IOException {
- return erasureCoding.addErasureCodingPolicies(policies);
+ return clientProto.addErasureCodingPolicies(policies);
}
@Override // ClientProtocol
public void removeErasureCodingPolicy(String ecPolicyName)
throws IOException {
- erasureCoding.removeErasureCodingPolicy(ecPolicyName);
+ clientProto.removeErasureCodingPolicy(ecPolicyName);
}
@Override // ClientProtocol
public void disableErasureCodingPolicy(String ecPolicyName)
throws IOException {
- erasureCoding.disableErasureCodingPolicy(ecPolicyName);
+ clientProto.disableErasureCodingPolicy(ecPolicyName);
}
@Override // ClientProtocol
public void enableErasureCodingPolicy(String ecPolicyName)
throws IOException {
- erasureCoding.enableErasureCodingPolicy(ecPolicyName);
+ clientProto.enableErasureCodingPolicy(ecPolicyName);
}
@Override // ClientProtocol
public ErasureCodingPolicy getErasureCodingPolicy(String src)
throws IOException {
- return erasureCoding.getErasureCodingPolicy(src);
+ return clientProto.getErasureCodingPolicy(src);
}
@Override // ClientProtocol
public void setErasureCodingPolicy(String src, String ecPolicyName)
throws IOException {
- erasureCoding.setErasureCodingPolicy(src, ecPolicyName);
+ clientProto.setErasureCodingPolicy(src, ecPolicyName);
}
@Override // ClientProtocol
public void unsetErasureCodingPolicy(String src) throws IOException {
- erasureCoding.unsetErasureCodingPolicy(src);
+ clientProto.unsetErasureCodingPolicy(src);
}
- @Override
+ @Override // ClientProtocol
public ECBlockGroupStats getECBlockGroupStats() throws IOException {
- return erasureCoding.getECBlockGroupStats();
+ return clientProto.getECBlockGroupStats();
}
- @Override
+ @Override // ClientProtocol
public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.getReplicatedBlockStats();
}
@Deprecated
- @Override
+ @Override // ClientProtocol
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
throws IOException {
- return listOpenFiles(prevId, EnumSet.of(OpenFilesType.ALL_OPEN_FILES),
- OpenFilesIterator.FILTER_PATH_DEFAULT);
+ return clientProto.listOpenFiles(prevId);
}
- @Override
+ @Override // ClientProtocol
public BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException {
- checkOperation(OperationCategory.READ, false);
- return null;
+ return clientProto.listOpenFiles(prevId, openFilesTypes, path);
+ }
+
+ @Override // ClientProtocol
+ public void satisfyStoragePolicy(String path) throws IOException {
+ clientProto.satisfyStoragePolicy(path);
}
@Override // NamenodeProtocol
@@ -2167,6 +1266,11 @@ public class RouterRpcServer extends AbstractService
return nnProto.isRollingUpgrade();
}
+ @Override // NamenodeProtocol
+ public Long getNextSPSPath() throws IOException {
+ return nnProto.getNextSPSPath();
+ }
+
/**
* Locate the location with the matching block pool id.
*
@@ -2176,7 +1280,7 @@ public class RouterRpcServer extends AbstractService
* @return Prioritized list of locations in the federated cluster.
* @throws IOException if the location for this path cannot be determined.
*/
- private RemoteLocation getLocationForPath(
+ protected RemoteLocation getLocationForPath(
String path, boolean failIfLocked, String blockPoolId)
throws IOException {
@@ -2276,27 +1380,6 @@ public class RouterRpcServer extends AbstractService
}
/**
- * Check if a path should be in all subclusters.
- *
- * @param path Path to check.
- * @return If a path should be in all subclusters.
- */
- private boolean isPathAll(final String path) {
- if (subclusterResolver instanceof MountTableResolver) {
- try {
- MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
- MountTable entry = mountTable.getMountPoint(path);
- if (entry != null) {
- return entry.isAll();
- }
- } catch (IOException e) {
- LOG.error("Cannot get mount point", e);
- }
- }
- return false;
- }
-
- /**
* Check if a path is in a read only mount point.
*
* @param path Path to check.
@@ -2318,121 +1401,6 @@ public class RouterRpcServer extends AbstractService
}
/**
- * Get the modification dates for mount points.
- *
- * @param path Name of the path to start checking dates from.
- * @return Map with the modification dates for all sub-entries.
- */
- private Map<String, Long> getMountPointDates(String path) {
- Map<String, Long> ret = new TreeMap<>();
- if (subclusterResolver instanceof MountTableResolver) {
- try {
- final List<String> children = subclusterResolver.getMountPoints(path);
- for (String child : children) {
- Long modTime = getModifiedTime(ret, path, child);
- ret.put(child, modTime);
- }
- } catch (IOException e) {
- LOG.error("Cannot get mount point", e);
- }
- }
- return ret;
- }
-
- /**
- * Get modified time for child. If the child is present in mount table it
- * will return the modified time. If the child is not present but subdirs of
- * this child are present then it will return latest modified subdir's time
- * as modified time of the requested child.
- * @param ret contains children and modified times.
- * @param mountTable.
- * @param path Name of the path to start checking dates from.
- * @param child child of the requested path.
- * @return modified time.
- */
- private long getModifiedTime(Map<String, Long> ret, String path,
- String child) {
- MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
- String srcPath;
- if (path.equals(Path.SEPARATOR)) {
- srcPath = Path.SEPARATOR + child;
- } else {
- srcPath = path + Path.SEPARATOR + child;
- }
- Long modTime = 0L;
- try {
- // Get mount table entry for the srcPath
- MountTable entry = mountTable.getMountPoint(srcPath);
- // if srcPath is not in mount table but its subdirs are in mount
- // table we will display latest modified subdir date/time.
- if (entry == null) {
- List<MountTable> entries = mountTable.getMounts(srcPath);
- for (MountTable eachEntry : entries) {
- // Get the latest date
- if (ret.get(child) == null ||
- ret.get(child) < eachEntry.getDateModified()) {
- modTime = eachEntry.getDateModified();
- }
- }
- } else {
- modTime = entry.getDateModified();
- }
- } catch (IOException e) {
- LOG.error("Cannot get mount point", e);
- }
- return modTime;
- }
-
- /**
- * Create a new file status for a mount point.
- *
- * @param name Name of the mount point.
- * @param childrenNum Number of children.
- * @param date Map with the dates.
- * @return New HDFS file status representing a mount point.
- */
- private HdfsFileStatus getMountPointStatus(
- String name, int childrenNum, long date) {
- long modTime = date;
- long accessTime = date;
- FsPermission permission = FsPermission.getDirDefault();
- String owner = this.superUser;
- String group = this.superGroup;
- try {
- // TODO support users, it should be the user for the pointed folder
- UserGroupInformation ugi = getRemoteUser();
- owner = ugi.getUserName();
- group = ugi.getPrimaryGroupName();
- } catch (IOException e) {
- LOG.error("Cannot get the remote user: {}", e.getMessage());
- }
- long inodeId = 0;
- return new HdfsFileStatus.Builder()
- .isdir(true)
- .mtime(modTime)
- .atime(accessTime)
- .perm(permission)
- .owner(owner)
- .group(group)
- .symlink(new byte[0])
- .path(DFSUtil.string2Bytes(name))
- .fileId(inodeId)
- .children(childrenNum)
- .build();
- }
-
- /**
- * Get the name of the method that is calling this function.
- *
- * @return Name of the method calling this function.
- */
- private static String getMethodName() {
- final StackTraceElement[] stack = Thread.currentThread().getStackTrace();
- String methodName = stack[3].getMethodName();
- return methodName;
- }
-
- /**
* Get the user that is invoking this operation.
*
* @return Remote user group information.
@@ -2490,16 +1458,4 @@ public class RouterRpcServer extends AbstractService
public FederationRPCMetrics getRPCMetrics() {
return this.rpcMonitor.getRPCMetrics();
}
-
- @Override
- public void satisfyStoragePolicy(String path) throws IOException {
- checkOperation(OperationCategory.WRITE, false);
- }
-
- @Override
- public Long getNextSPSPath() throws IOException {
- checkOperation(OperationCategory.READ, false);
- // not supported
- return null;
- }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: HDFS-13790. RBF: Move ClientProtocol APIs to
its own module. Contributed by Chao Sun.
Posted by br...@apache.org.
HDFS-13790. RBF: Move ClientProtocol APIs to its own module. Contributed by Chao Sun.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6425ed27
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6425ed27
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6425ed27
Branch: refs/heads/trunk
Commit: 6425ed27ea638da75f656204d6df4adad1d91fe1
Parents: 4aacbff
Author: Brahma Reddy Battula <br...@apache.org>
Authored: Mon Aug 20 12:20:29 2018 +0530
Committer: Brahma Reddy Battula <br...@apache.org>
Committed: Mon Aug 20 12:20:29 2018 +0530
----------------------------------------------------------------------
.../federation/router/RouterClientProtocol.java | 1805 ++++++++++++++++++
.../federation/router/RouterRpcServer.java | 1360 ++-----------
2 files changed, 1963 insertions(+), 1202 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/6425ed27/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
new file mode 100644
index 0000000..f45da3c
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/router/RouterClientProtocol.java
@@ -0,0 +1,1805 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.federation.router;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.crypto.CryptoProtocolVersion;
+import org.apache.hadoop.fs.BatchedRemoteIterator;
+import org.apache.hadoop.fs.CacheFlag;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.CreateFlag;
+import org.apache.hadoop.fs.FsServerDefaults;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.QuotaUsage;
+import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.fs.XAttr;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.AddBlockFlag;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.inotify.EventBatchList;
+import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
+import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveEntry;
+import org.apache.hadoop.hdfs.protocol.CacheDirectiveInfo;
+import org.apache.hadoop.hdfs.protocol.CachePoolEntry;
+import org.apache.hadoop.hdfs.protocol.CachePoolInfo;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
+import org.apache.hadoop.hdfs.protocol.DatanodeID;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.DirectoryListing;
+import org.apache.hadoop.hdfs.protocol.ECBlockGroupStats;
+import org.apache.hadoop.hdfs.protocol.EncryptionZone;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
+import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.HdfsConstants;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
+import org.apache.hadoop.hdfs.protocol.LastBlockWithStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.protocol.OpenFileEntry;
+import org.apache.hadoop.hdfs.protocol.OpenFilesIterator;
+import org.apache.hadoop.hdfs.protocol.ReplicatedBlockStats;
+import org.apache.hadoop.hdfs.protocol.RollingUpgradeInfo;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
+import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
+import org.apache.hadoop.hdfs.protocol.ZoneReencryptionStatus;
+import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.server.federation.resolver.ActiveNamenodeResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.FederationNamespaceInfo;
+import org.apache.hadoop.hdfs.server.federation.resolver.FileSubclusterResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.MountTableResolver;
+import org.apache.hadoop.hdfs.server.federation.resolver.RemoteLocation;
+import org.apache.hadoop.hdfs.server.federation.store.records.MountTable;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
+import org.apache.hadoop.io.EnumSetWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+
+/**
+ * Module that implements all the RPC calls in {@link ClientProtocol} in the
+ * {@link RouterRpcServer}.
+ */
+public class RouterClientProtocol implements ClientProtocol {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(RouterClientProtocol.class.getName());
+
+ private final RouterRpcServer rpcServer;
+ private final RouterRpcClient rpcClient;
+ private final FileSubclusterResolver subclusterResolver;
+ private final ActiveNamenodeResolver namenodeResolver;
+
+ /** Identifier for the super user. */
+ private final String superUser;
+ /** Identifier for the super group. */
+ private final String superGroup;
+ /** Erasure coding calls. */
+ private final ErasureCoding erasureCoding;
+
+ RouterClientProtocol(Configuration conf, RouterRpcServer rpcServer) {
+ this.rpcServer = rpcServer;
+ this.rpcClient = rpcServer.getRPCClient();
+ this.subclusterResolver = rpcServer.getSubclusterResolver();
+ this.namenodeResolver = rpcServer.getNamenodeResolver();
+
+ // User and group for reporting
+ this.superUser = System.getProperty("user.name");
+ this.superGroup = conf.get(
+ DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,
+ DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT);
+ this.erasureCoding = new ErasureCoding(rpcServer);
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ return null;
+ }
+
+ /**
+ * The the delegation token from each name service.
+ *
+ * @param renewer
+ * @return Name service -> Token.
+ * @throws IOException
+ */
+ public Map<FederationNamespaceInfo, Token<DelegationTokenIdentifier>>
+ getDelegationTokens(Text renewer) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ return null;
+ }
+
+ @Override
+ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ return 0;
+ }
+
+ @Override
+ public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public LocatedBlocks getBlockLocations(String src, final long offset,
+ final long length) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, false);
+ RemoteMethod remoteMethod = new RemoteMethod("getBlockLocations",
+ new Class<?>[] {String.class, long.class, long.class},
+ new RemoteParam(), offset, length);
+ return rpcClient.invokeSequential(locations, remoteMethod,
+ LocatedBlocks.class, null);
+ }
+
+ @Override
+ public FsServerDefaults getServerDefaults() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod("getServerDefaults");
+ String ns = subclusterResolver.getDefaultNamespace();
+ return (FsServerDefaults) rpcClient.invokeSingle(ns, method);
+ }
+
+ @Override
+ public HdfsFileStatus create(String src, FsPermission masked,
+ String clientName, EnumSetWritable<CreateFlag> flag,
+ boolean createParent, short replication, long blockSize,
+ CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ if (createParent && isPathAll(src)) {
+ int index = src.lastIndexOf(Path.SEPARATOR);
+ String parent = src.substring(0, index);
+ LOG.debug("Creating {} requires creating parent {}", src, parent);
+ FsPermission parentPermissions = getParentPermission(masked);
+ boolean success = mkdirs(parent, parentPermissions, createParent);
+ if (!success) {
+ // This shouldn't happen as mkdirs returns true or exception
+ LOG.error("Couldn't create parents for {}", src);
+ }
+ }
+
+ RemoteLocation createLocation = rpcServer.getCreateLocation(src);
+ RemoteMethod method = new RemoteMethod("create",
+ new Class<?>[] {String.class, FsPermission.class, String.class,
+ EnumSetWritable.class, boolean.class, short.class,
+ long.class, CryptoProtocolVersion[].class,
+ String.class},
+ createLocation.getDest(), masked, clientName, flag, createParent,
+ replication, blockSize, supportedVersions, ecPolicyName);
+ return (HdfsFileStatus) rpcClient.invokeSingle(createLocation, method);
+ }
+
+ @Override
+ public LastBlockWithStatus append(String src, final String clientName,
+ final EnumSetWritable<CreateFlag> flag) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("append",
+ new Class<?>[] {String.class, String.class, EnumSetWritable.class},
+ new RemoteParam(), clientName, flag);
+ return rpcClient.invokeSequential(
+ locations, method, LastBlockWithStatus.class, null);
+ }
+
+ @Override
+ public boolean recoverLease(String src, String clientName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("recoverLease",
+ new Class<?>[] {String.class, String.class}, new RemoteParam(),
+ clientName);
+ Object result = rpcClient.invokeSequential(
+ locations, method, Boolean.class, Boolean.TRUE);
+ return (boolean) result;
+ }
+
+ @Override
+ public boolean setReplication(String src, short replication)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setReplication",
+ new Class<?>[] {String.class, short.class}, new RemoteParam(),
+ replication);
+ Object result = rpcClient.invokeSequential(
+ locations, method, Boolean.class, Boolean.TRUE);
+ return (boolean) result;
+ }
+
+ @Override
+ public void setStoragePolicy(String src, String policyName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ List<RemoteLocation> locations = rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setStoragePolicy",
+ new Class<?>[] {String.class, String.class},
+ new RemoteParam(), policyName);
+ rpcClient.invokeSequential(locations, method, null, null);
+ }
+
+ @Override
+ public BlockStoragePolicy[] getStoragePolicies() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod("getStoragePolicies");
+ String ns = subclusterResolver.getDefaultNamespace();
+ return (BlockStoragePolicy[]) rpcClient.invokeSingle(ns, method);
+ }
+
+ @Override
+ public void setPermission(String src, FsPermission permissions)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setPermission",
+ new Class<?>[] {String.class, FsPermission.class},
+ new RemoteParam(), permissions);
+ if (isPathAll(src)) {
+ rpcClient.invokeConcurrent(locations, method);
+ } else {
+ rpcClient.invokeSequential(locations, method);
+ }
+ }
+
+ @Override
+ public void setOwner(String src, String username, String groupname)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setOwner",
+ new Class<?>[] {String.class, String.class, String.class},
+ new RemoteParam(), username, groupname);
+ if (isPathAll(src)) {
+ rpcClient.invokeConcurrent(locations, method);
+ } else {
+ rpcClient.invokeSequential(locations, method);
+ }
+ }
+
+ /**
+ * Excluded and favored nodes are not verified and will be ignored by
+ * placement policy if they are not in the same nameservice as the file.
+ */
+ @Override
+ public LocatedBlock addBlock(String src, String clientName,
+ ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
+ String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("addBlock",
+ new Class<?>[] {String.class, String.class, ExtendedBlock.class,
+ DatanodeInfo[].class, long.class, String[].class,
+ EnumSet.class},
+ new RemoteParam(), clientName, previous, excludedNodes, fileId,
+ favoredNodes, addBlockFlags);
+ // TODO verify the excludedNodes and favoredNodes are acceptable to this NN
+ return rpcClient.invokeSequential(
+ locations, method, LocatedBlock.class, null);
+ }
+
+ /**
+ * Excluded nodes are not verified and will be ignored by placement if they
+ * are not in the same nameservice as the file.
+ */
+ @Override
+ public LocatedBlock getAdditionalDatanode(final String src, final long fileId,
+ final ExtendedBlock blk, final DatanodeInfo[] existings,
+ final String[] existingStorageIDs, final DatanodeInfo[] excludes,
+ final int numAdditionalNodes, final String clientName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("getAdditionalDatanode",
+ new Class<?>[] {String.class, long.class, ExtendedBlock.class,
+ DatanodeInfo[].class, String[].class,
+ DatanodeInfo[].class, int.class, String.class},
+ new RemoteParam(), fileId, blk, existings, existingStorageIDs, excludes,
+ numAdditionalNodes, clientName);
+ return rpcClient.invokeSequential(
+ locations, method, LocatedBlock.class, null);
+ }
+
+ @Override
+ public void abandonBlock(ExtendedBlock b, long fileId, String src,
+ String holder) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ RemoteMethod method = new RemoteMethod("abandonBlock",
+ new Class<?>[] {ExtendedBlock.class, long.class, String.class,
+ String.class},
+ b, fileId, new RemoteParam(), holder);
+ rpcClient.invokeSingle(b, method);
+ }
+
+ @Override
+ public boolean complete(String src, String clientName, ExtendedBlock last,
+ long fileId) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("complete",
+ new Class<?>[] {String.class, String.class, ExtendedBlock.class,
+ long.class},
+ new RemoteParam(), clientName, last, fileId);
+ // Complete can return true/false, so don't expect a result
+ return rpcClient.invokeSequential(locations, method, Boolean.class, null);
+ }
+
+ @Override
+ public LocatedBlock updateBlockForPipeline(
+ ExtendedBlock block, String clientName) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ RemoteMethod method = new RemoteMethod("updateBlockForPipeline",
+ new Class<?>[] {ExtendedBlock.class, String.class},
+ block, clientName);
+ return (LocatedBlock) rpcClient.invokeSingle(block, method);
+ }
+
+ /**
+ * Datanode are not verified to be in the same nameservice as the old block.
+ * TODO This may require validation.
+ */
+ @Override
+ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
+ ExtendedBlock newBlock, DatanodeID[] newNodes, String[] newStorageIDs)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ RemoteMethod method = new RemoteMethod("updatePipeline",
+ new Class<?>[] {String.class, ExtendedBlock.class, ExtendedBlock.class,
+ DatanodeID[].class, String[].class},
+ clientName, oldBlock, newBlock, newNodes, newStorageIDs);
+ rpcClient.invokeSingle(oldBlock, method);
+ }
+
+ @Override
+ public long getPreferredBlockSize(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("getPreferredBlockSize",
+ new Class<?>[] {String.class}, new RemoteParam());
+ return rpcClient.invokeSequential(locations, method, Long.class, null);
+ }
+
+ @Deprecated
+ @Override
+ public boolean rename(final String src, final String dst)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> srcLocations =
+ rpcServer.getLocationsForPath(src, true, false);
+ // srcLocations may be trimmed by getRenameDestinations()
+ final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+ RemoteParam dstParam = getRenameDestinations(locs, dst);
+ if (locs.isEmpty()) {
+ throw new IOException(
+ "Rename of " + src + " to " + dst + " is not allowed," +
+ " no eligible destination in the same namespace was found.");
+ }
+ RemoteMethod method = new RemoteMethod("rename",
+ new Class<?>[] {String.class, String.class},
+ new RemoteParam(), dstParam);
+ return rpcClient.invokeSequential(locs, method, Boolean.class,
+ Boolean.TRUE);
+ }
+
+ @Override
+ public void rename2(final String src, final String dst,
+ final Options.Rename... options) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> srcLocations =
+ rpcServer.getLocationsForPath(src, true, false);
+ // srcLocations may be trimmed by getRenameDestinations()
+ final List<RemoteLocation> locs = new LinkedList<>(srcLocations);
+ RemoteParam dstParam = getRenameDestinations(locs, dst);
+ if (locs.isEmpty()) {
+ throw new IOException(
+ "Rename of " + src + " to " + dst + " is not allowed," +
+ " no eligible destination in the same namespace was found.");
+ }
+ RemoteMethod method = new RemoteMethod("rename2",
+ new Class<?>[] {String.class, String.class, options.getClass()},
+ new RemoteParam(), dstParam, options);
+ rpcClient.invokeSequential(locs, method, null, null);
+ }
+
+ @Override
+ public void concat(String trg, String[] src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // See if the src and target files are all in the same namespace
+ LocatedBlocks targetBlocks = getBlockLocations(trg, 0, 1);
+ if (targetBlocks == null) {
+ throw new IOException("Cannot locate blocks for target file - " + trg);
+ }
+ LocatedBlock lastLocatedBlock = targetBlocks.getLastLocatedBlock();
+ String targetBlockPoolId = lastLocatedBlock.getBlock().getBlockPoolId();
+ for (String source : src) {
+ LocatedBlocks sourceBlocks = getBlockLocations(source, 0, 1);
+ if (sourceBlocks == null) {
+ throw new IOException(
+ "Cannot located blocks for source file " + source);
+ }
+ String sourceBlockPoolId =
+ sourceBlocks.getLastLocatedBlock().getBlock().getBlockPoolId();
+ if (!sourceBlockPoolId.equals(targetBlockPoolId)) {
+ throw new IOException("Cannot concatenate source file " + source
+ + " because it is located in a different namespace"
+ + " with block pool id " + sourceBlockPoolId
+ + " from the target file with block pool id "
+ + targetBlockPoolId);
+ }
+ }
+
+ // Find locations in the matching namespace.
+ final RemoteLocation targetDestination =
+ rpcServer.getLocationForPath(trg, true, targetBlockPoolId);
+ String[] sourceDestinations = new String[src.length];
+ for (int i = 0; i < src.length; i++) {
+ String sourceFile = src[i];
+ RemoteLocation location =
+ rpcServer.getLocationForPath(sourceFile, true, targetBlockPoolId);
+ sourceDestinations[i] = location.getDest();
+ }
+ // Invoke
+ RemoteMethod method = new RemoteMethod("concat",
+ new Class<?>[] {String.class, String[].class},
+ targetDestination.getDest(), sourceDestinations);
+ rpcClient.invokeSingle(targetDestination, method);
+ }
+
+ @Override
+ public boolean truncate(String src, long newLength, String clientName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("truncate",
+ new Class<?>[] {String.class, long.class, String.class},
+ new RemoteParam(), newLength, clientName);
+ return rpcClient.invokeSequential(locations, method, Boolean.class,
+ Boolean.TRUE);
+ }
+
+ @Override
+ public boolean delete(String src, boolean recursive) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true, false);
+ RemoteMethod method = new RemoteMethod("delete",
+ new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
+ recursive);
+ if (isPathAll(src)) {
+ return rpcClient.invokeAll(locations, method);
+ } else {
+ return rpcClient.invokeSequential(locations, method,
+ Boolean.class, Boolean.TRUE);
+ }
+ }
+
+ @Override
+ public boolean mkdirs(String src, FsPermission masked, boolean createParent)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("mkdirs",
+ new Class<?>[] {String.class, FsPermission.class, boolean.class},
+ new RemoteParam(), masked, createParent);
+
+ // Create in all locations
+ if (isPathAll(src)) {
+ return rpcClient.invokeAll(locations, method);
+ }
+
+ if (locations.size() > 1) {
+ // Check if this directory already exists
+ try {
+ HdfsFileStatus fileStatus = getFileInfo(src);
+ if (fileStatus != null) {
+ // When existing, the NN doesn't return an exception; return true
+ return true;
+ }
+ } catch (IOException ioe) {
+ // Can't query if this file exists or not.
+ LOG.error("Error requesting file info for path {} while proxing mkdirs",
+ src, ioe);
+ }
+ }
+
+ RemoteLocation firstLocation = locations.get(0);
+ return (boolean) rpcClient.invokeSingle(firstLocation, method);
+ }
+
+ @Override
+ public void renewLease(String clientName) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ RemoteMethod method = new RemoteMethod("renewLease",
+ new Class<?>[] {String.class}, clientName);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, false, false);
+ }
+
+ @Override
+ public DirectoryListing getListing(String src, byte[] startAfter,
+ boolean needLocation) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // Locate the dir and fetch the listing
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("getListing",
+ new Class<?>[] {String.class, startAfter.getClass(), boolean.class},
+ new RemoteParam(), startAfter, needLocation);
+ Map<RemoteLocation, DirectoryListing> listings =
+ rpcClient.invokeConcurrent(
+ locations, method, false, false, DirectoryListing.class);
+
+ Map<String, HdfsFileStatus> nnListing = new TreeMap<>();
+ int totalRemainingEntries = 0;
+ int remainingEntries = 0;
+ boolean namenodeListingExists = false;
+ if (listings != null) {
+ // Check the subcluster listing with the smallest name
+ String lastName = null;
+ for (Map.Entry<RemoteLocation, DirectoryListing> entry :
+ listings.entrySet()) {
+ RemoteLocation location = entry.getKey();
+ DirectoryListing listing = entry.getValue();
+ if (listing == null) {
+ LOG.debug("Cannot get listing from {}", location);
+ } else {
+ totalRemainingEntries += listing.getRemainingEntries();
+ HdfsFileStatus[] partialListing = listing.getPartialListing();
+ int length = partialListing.length;
+ if (length > 0) {
+ HdfsFileStatus lastLocalEntry = partialListing[length-1];
+ String lastLocalName = lastLocalEntry.getLocalName();
+ if (lastName == null || lastName.compareTo(lastLocalName) > 0) {
+ lastName = lastLocalName;
+ }
+ }
+ }
+ }
+
+ // Add existing entries
+ for (Object value : listings.values()) {
+ DirectoryListing listing = (DirectoryListing) value;
+ if (listing != null) {
+ namenodeListingExists = true;
+ for (HdfsFileStatus file : listing.getPartialListing()) {
+ String filename = file.getLocalName();
+ if (totalRemainingEntries > 0 && filename.compareTo(lastName) > 0) {
+ // Discarding entries further than the lastName
+ remainingEntries++;
+ } else {
+ nnListing.put(filename, file);
+ }
+ }
+ remainingEntries += listing.getRemainingEntries();
+ }
+ }
+ }
+
+ // Add mount points at this level in the tree
+ final List<String> children = subclusterResolver.getMountPoints(src);
+ if (children != null) {
+ // Get the dates for each mount point
+ Map<String, Long> dates = getMountPointDates(src);
+
+ // Create virtual folder with the mount name
+ for (String child : children) {
+ long date = 0;
+ if (dates != null && dates.containsKey(child)) {
+ date = dates.get(child);
+ }
+ // TODO add number of children
+ HdfsFileStatus dirStatus = getMountPointStatus(child, 0, date);
+
+ // This may overwrite existing listing entries with the mount point
+ // TODO don't add if already there?
+ nnListing.put(child, dirStatus);
+ }
+ }
+
+ if (!namenodeListingExists && nnListing.size() == 0) {
+ // NN returns a null object if the directory cannot be found and has no
+ // listing. If we didn't retrieve any NN listing data, and there are no
+ // mount points here, return null.
+ return null;
+ }
+
+ // Generate combined listing
+ HdfsFileStatus[] combinedData = new HdfsFileStatus[nnListing.size()];
+ combinedData = nnListing.values().toArray(combinedData);
+ return new DirectoryListing(combinedData, remainingEntries);
+ }
+
+ @Override
+ public HdfsFileStatus getFileInfo(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("getFileInfo",
+ new Class<?>[] {String.class}, new RemoteParam());
+
+ HdfsFileStatus ret = null;
+ // If it's a directory, we check in all locations
+ if (isPathAll(src)) {
+ ret = getFileInfoAll(locations, method);
+ } else {
+ // Check for file information sequentially
+ ret = rpcClient.invokeSequential(
+ locations, method, HdfsFileStatus.class, null);
+ }
+
+ // If there is no real path, check mount points
+ if (ret == null) {
+ List<String> children = subclusterResolver.getMountPoints(src);
+ if (children != null && !children.isEmpty()) {
+ Map<String, Long> dates = getMountPointDates(src);
+ long date = 0;
+ if (dates != null && dates.containsKey(src)) {
+ date = dates.get(src);
+ }
+ ret = getMountPointStatus(src, children.size(), date);
+ }
+ }
+
+ return ret;
+ }
+
+ @Override
+ public boolean isFileClosed(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("isFileClosed",
+ new Class<?>[] {String.class}, new RemoteParam());
+ return rpcClient.invokeSequential(locations, method, Boolean.class,
+ Boolean.TRUE);
+ }
+
+ @Override
+ public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("getFileLinkInfo",
+ new Class<?>[] {String.class}, new RemoteParam());
+ return rpcClient.invokeSequential(locations, method, HdfsFileStatus.class,
+ null);
+ }
+
+ @Override
+ public HdfsLocatedFileStatus getLocatedFileInfo(String src,
+ boolean needBlockToken) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("getLocatedFileInfo",
+ new Class<?>[] {String.class, boolean.class}, new RemoteParam(),
+ needBlockToken);
+ return (HdfsLocatedFileStatus) rpcClient.invokeSequential(
+ locations, method, HdfsFileStatus.class, null);
+ }
+
+ @Override
+ public long[] getStats() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("getStats");
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ Map<FederationNamespaceInfo, long[]> results =
+ rpcClient.invokeConcurrent(nss, method, true, false, long[].class);
+ long[] combinedData = new long[STATS_ARRAY_LENGTH];
+ for (long[] data : results.values()) {
+ for (int i = 0; i < combinedData.length && i < data.length; i++) {
+ if (data[i] >= 0) {
+ combinedData[i] += data[i];
+ }
+ }
+ }
+ return combinedData;
+ }
+
+ @Override
+ public DatanodeInfo[] getDatanodeReport(HdfsConstants.DatanodeReportType type)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+ return rpcServer.getDatanodeReport(type, true, 0);
+ }
+
+ @Override
+ public DatanodeStorageReport[] getDatanodeStorageReport(
+ HdfsConstants.DatanodeReportType type) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ Map<String, DatanodeStorageReport[]> dnSubcluster =
+ rpcServer.getDatanodeStorageReportMap(type);
+
+ // Avoid repeating machines in multiple subclusters
+ Map<String, DatanodeStorageReport> datanodesMap = new LinkedHashMap<>();
+ for (DatanodeStorageReport[] dns : dnSubcluster.values()) {
+ for (DatanodeStorageReport dn : dns) {
+ DatanodeInfo dnInfo = dn.getDatanodeInfo();
+ String nodeId = dnInfo.getXferAddr();
+ if (!datanodesMap.containsKey(nodeId)) {
+ datanodesMap.put(nodeId, dn);
+ }
+ // TODO merge somehow, right now it just takes the first one
+ }
+ }
+
+ Collection<DatanodeStorageReport> datanodes = datanodesMap.values();
+ DatanodeStorageReport[] combinedData =
+ new DatanodeStorageReport[datanodes.size()];
+ combinedData = datanodes.toArray(combinedData);
+ return combinedData;
+ }
+
+ @Override
+ public boolean setSafeMode(HdfsConstants.SafeModeAction action,
+ boolean isChecked) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // Set safe mode in all the name spaces
+ RemoteMethod method = new RemoteMethod("setSafeMode",
+ new Class<?>[] {HdfsConstants.SafeModeAction.class, boolean.class},
+ action, isChecked);
+ Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ Map<FederationNamespaceInfo, Boolean> results =
+ rpcClient.invokeConcurrent(
+ nss, method, true, !isChecked, Boolean.class);
+
+ // We only report true if all the name space are in safe mode
+ int numSafemode = 0;
+ for (boolean safemode : results.values()) {
+ if (safemode) {
+ numSafemode++;
+ }
+ }
+ return numSafemode == results.size();
+ }
+
+ @Override
+ public boolean restoreFailedStorage(String arg) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("restoreFailedStorage",
+ new Class<?>[] {String.class}, arg);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ Map<FederationNamespaceInfo, Boolean> ret =
+ rpcClient.invokeConcurrent(nss, method, true, false, Boolean.class);
+
+ boolean success = true;
+ for (boolean s : ret.values()) {
+ if (!s) {
+ success = false;
+ break;
+ }
+ }
+ return success;
+ }
+
+ @Override
+ public boolean saveNamespace(long timeWindow, long txGap) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("saveNamespace",
+ new Class<?>[] {Long.class, Long.class}, timeWindow, txGap);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ Map<FederationNamespaceInfo, Boolean> ret =
+ rpcClient.invokeConcurrent(nss, method, true, false, boolean.class);
+
+ boolean success = true;
+ for (boolean s : ret.values()) {
+ if (!s) {
+ success = false;
+ break;
+ }
+ }
+ return success;
+ }
+
+ @Override
+ public long rollEdits() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ RemoteMethod method = new RemoteMethod("rollEdits", new Class<?>[] {});
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ Map<FederationNamespaceInfo, Long> ret =
+ rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+
+ // Return the maximum txid
+ long txid = 0;
+ for (long t : ret.values()) {
+ if (t > txid) {
+ txid = t;
+ }
+ }
+ return txid;
+ }
+
+ @Override
+ public void refreshNodes() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("refreshNodes", new Class<?>[] {});
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, true);
+ }
+
+ @Override
+ public void finalizeUpgrade() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("finalizeUpgrade",
+ new Class<?>[] {});
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, false);
+ }
+
+ @Override
+ public boolean upgradeStatus() throws IOException {
+ String methodName = RouterRpcServer.getMethodName();
+ throw new UnsupportedOperationException(
+ "Operation \"" + methodName + "\" is not supported");
+ }
+
+ @Override
+ public RollingUpgradeInfo rollingUpgrade(HdfsConstants.RollingUpgradeAction action)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod("rollingUpgrade",
+ new Class<?>[] {HdfsConstants.RollingUpgradeAction.class}, action);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ Map<FederationNamespaceInfo, RollingUpgradeInfo> ret =
+ rpcClient.invokeConcurrent(
+ nss, method, true, false, RollingUpgradeInfo.class);
+
+ // Return the first rolling upgrade info
+ RollingUpgradeInfo info = null;
+ for (RollingUpgradeInfo infoNs : ret.values()) {
+ if (info == null && infoNs != null) {
+ info = infoNs;
+ }
+ }
+ return info;
+ }
+
+ @Override
+ public void metaSave(String filename) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("metaSave",
+ new Class<?>[] {String.class}, filename);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, false);
+ }
+
+ @Override
+ public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(path, false);
+ RemoteMethod method = new RemoteMethod("listCorruptFileBlocks",
+ new Class<?>[] {String.class, String.class},
+ new RemoteParam(), cookie);
+ return rpcClient.invokeSequential(
+ locations, method, CorruptFileBlocks.class, null);
+ }
+
+ @Override
+ public void setBalancerBandwidth(long bandwidth) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.UNCHECKED);
+
+ RemoteMethod method = new RemoteMethod("setBalancerBandwidth",
+ new Class<?>[] {Long.class}, bandwidth);
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ rpcClient.invokeConcurrent(nss, method, true, false);
+ }
+
+ @Override
+ public ContentSummary getContentSummary(String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // Get the summaries from regular files
+ Collection<ContentSummary> summaries = new LinkedList<>();
+ FileNotFoundException notFoundException = null;
+ try {
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(path, false);
+ RemoteMethod method = new RemoteMethod("getContentSummary",
+ new Class<?>[] {String.class}, new RemoteParam());
+ Map<RemoteLocation, ContentSummary> results =
+ rpcClient.invokeConcurrent(
+ locations, method, false, false, ContentSummary.class);
+ summaries.addAll(results.values());
+ } catch (FileNotFoundException e) {
+ notFoundException = e;
+ }
+
+ // Add mount points at this level in the tree
+ final List<String> children = subclusterResolver.getMountPoints(path);
+ if (children != null) {
+ for (String child : children) {
+ Path childPath = new Path(path, child);
+ try {
+ ContentSummary mountSummary = getContentSummary(childPath.toString());
+ if (mountSummary != null) {
+ summaries.add(mountSummary);
+ }
+ } catch (Exception e) {
+ LOG.error("Cannot get content summary for mount {}: {}",
+ childPath, e.getMessage());
+ }
+ }
+ }
+
+ // Throw original exception if no original nor mount points
+ if (summaries.isEmpty() && notFoundException != null) {
+ throw notFoundException;
+ }
+
+ return aggregateContentSummary(summaries);
+ }
+
+ @Override
+ public void fsync(String src, long fileId, String clientName,
+ long lastBlockLength) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("fsync",
+ new Class<?>[] {String.class, long.class, String.class, long.class },
+ new RemoteParam(), fileId, clientName, lastBlockLength);
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public void setTimes(String src, long mtime, long atime) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setTimes",
+ new Class<?>[] {String.class, long.class, long.class},
+ new RemoteParam(), mtime, atime);
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public void createSymlink(String target, String link, FsPermission dirPerms,
+ boolean createParent) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO Verify that the link location is in the same NS as the targets
+ final List<RemoteLocation> targetLocations =
+ rpcServer.getLocationsForPath(target, true);
+ final List<RemoteLocation> linkLocations =
+ rpcServer.getLocationsForPath(link, true);
+ RemoteLocation linkLocation = linkLocations.get(0);
+ RemoteMethod method = new RemoteMethod("createSymlink",
+ new Class<?>[] {String.class, String.class, FsPermission.class,
+ boolean.class},
+ new RemoteParam(), linkLocation.getDest(), dirPerms, createParent);
+ rpcClient.invokeSequential(targetLocations, method);
+ }
+
+ @Override
+ public String getLinkTarget(String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(path, true);
+ RemoteMethod method = new RemoteMethod("getLinkTarget",
+ new Class<?>[] {String.class}, new RemoteParam());
+ return rpcClient.invokeSequential(locations, method, String.class, null);
+ }
+
+ @Override // Client Protocol
+ public void allowSnapshot(String snapshotRoot) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override // Client Protocol
+ public void disallowSnapshot(String snapshot) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public void renameSnapshot(String snapshotRoot, String snapshotOldName,
+ String snapshotNewName) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public SnapshottableDirectoryStatus[] getSnapshottableDirListing()
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
+ String earlierSnapshotName, String laterSnapshotName) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public SnapshotDiffReportListing getSnapshotDiffReportListing(
+ String snapshotRoot, String earlierSnapshotName, String laterSnapshotName,
+ byte[] startPath, int index) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public long addCacheDirective(CacheDirectiveInfo path,
+ EnumSet<CacheFlag> flags) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ return 0;
+ }
+
+ @Override
+ public void modifyCacheDirective(CacheDirectiveInfo directive,
+ EnumSet<CacheFlag> flags) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public void removeCacheDirective(long id) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public BatchedRemoteIterator.BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
+ long prevId, CacheDirectiveInfo filter) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public void addCachePool(CachePoolInfo info) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public void modifyCachePool(CachePoolInfo info) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public void removeCachePool(String cachePoolName) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public BatchedRemoteIterator.BatchedEntries<CachePoolEntry> listCachePools(String prevKey)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public void modifyAclEntries(String src, List<AclEntry> aclSpec)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("modifyAclEntries",
+ new Class<?>[] {String.class, List.class},
+ new RemoteParam(), aclSpec);
+ rpcClient.invokeSequential(locations, method, null, null);
+ }
+
+ @Override
+ public void removeAclEntries(String src, List<AclEntry> aclSpec)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("removeAclEntries",
+ new Class<?>[] {String.class, List.class},
+ new RemoteParam(), aclSpec);
+ rpcClient.invokeSequential(locations, method, null, null);
+ }
+
+ @Override
+ public void removeDefaultAcl(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("removeDefaultAcl",
+ new Class<?>[] {String.class}, new RemoteParam());
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public void removeAcl(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("removeAcl",
+ new Class<?>[] {String.class}, new RemoteParam());
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public void setAcl(String src, List<AclEntry> aclSpec) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod(
+ "setAcl", new Class<?>[] {String.class, List.class},
+ new RemoteParam(), aclSpec);
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public AclStatus getAclStatus(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("getAclStatus",
+ new Class<?>[] {String.class}, new RemoteParam());
+ return rpcClient.invokeSequential(locations, method, AclStatus.class, null);
+ }
+
+ @Override
+ public void createEncryptionZone(String src, String keyName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("createEncryptionZone",
+ new Class<?>[] {String.class, String.class},
+ new RemoteParam(), keyName);
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public EncryptionZone getEZForPath(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("getEZForPath",
+ new Class<?>[] {String.class}, new RemoteParam());
+ return rpcClient.invokeSequential(
+ locations, method, EncryptionZone.class, null);
+ }
+
+ @Override
+ public BatchedRemoteIterator.BatchedEntries<EncryptionZone> listEncryptionZones(long prevId)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public void reencryptEncryptionZone(String zone, HdfsConstants.ReencryptAction action)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public BatchedRemoteIterator.BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(
+ long prevId) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public void setXAttr(String src, XAttr xAttr, EnumSet<XAttrSetFlag> flag)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("setXAttr",
+ new Class<?>[] {String.class, XAttr.class, EnumSet.class},
+ new RemoteParam(), xAttr, flag);
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("getXAttrs",
+ new Class<?>[] {String.class, List.class}, new RemoteParam(), xAttrs);
+ return (List<XAttr>) rpcClient.invokeSequential(
+ locations, method, List.class, null);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public List<XAttr> listXAttrs(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, false);
+ RemoteMethod method = new RemoteMethod("listXAttrs",
+ new Class<?>[] {String.class}, new RemoteParam());
+ return (List<XAttr>) rpcClient.invokeSequential(
+ locations, method, List.class, null);
+ }
+
+ @Override
+ public void removeXAttr(String src, XAttr xAttr) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(src, true);
+ RemoteMethod method = new RemoteMethod("removeXAttr",
+ new Class<?>[] {String.class, XAttr.class}, new RemoteParam(), xAttr);
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public void checkAccess(String path, FsAction mode) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ // TODO handle virtual directories
+ final List<RemoteLocation> locations =
+ rpcServer.getLocationsForPath(path, true);
+ RemoteMethod method = new RemoteMethod("checkAccess",
+ new Class<?>[] {String.class, FsAction.class},
+ new RemoteParam(), mode);
+ rpcClient.invokeSequential(locations, method);
+ }
+
+ @Override
+ public long getCurrentEditLogTxid() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ);
+
+ RemoteMethod method = new RemoteMethod(
+ "getCurrentEditLogTxid", new Class<?>[] {});
+ final Set<FederationNamespaceInfo> nss = namenodeResolver.getNamespaces();
+ Map<FederationNamespaceInfo, Long> ret =
+ rpcClient.invokeConcurrent(nss, method, true, false, long.class);
+
+ // Return the maximum txid
+ long txid = 0;
+ for (long t : ret.values()) {
+ if (t > txid) {
+ txid = t;
+ }
+ }
+ return txid;
+ }
+
+ @Override
+ public EventBatchList getEditsFromTxid(long txid) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public DataEncryptionKey getDataEncryptionKey() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public String createSnapshot(String snapshotRoot, String snapshotName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+ return null;
+ }
+
+ @Override
+ public void deleteSnapshot(String snapshotRoot, String snapshotName)
+ throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public void setQuota(String path, long namespaceQuota, long storagespaceQuota,
+ StorageType type) throws IOException {
+ rpcServer.getQuotaModule()
+ .setQuota(path, namespaceQuota, storagespaceQuota, type);
+ }
+
+ @Override
+ public QuotaUsage getQuotaUsage(String path) throws IOException {
+ return rpcServer.getQuotaModule().getQuotaUsage(path);
+ }
+
+ @Override
+ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE);
+
+ // Block pool id -> blocks
+ Map<String, List<LocatedBlock>> blockLocations = new HashMap<>();
+ for (LocatedBlock block : blocks) {
+ String bpId = block.getBlock().getBlockPoolId();
+ List<LocatedBlock> bpBlocks = blockLocations.get(bpId);
+ if (bpBlocks == null) {
+ bpBlocks = new LinkedList<>();
+ blockLocations.put(bpId, bpBlocks);
+ }
+ bpBlocks.add(block);
+ }
+
+ // Invoke each block pool
+ for (Map.Entry<String, List<LocatedBlock>> entry : blockLocations.entrySet()) {
+ String bpId = entry.getKey();
+ List<LocatedBlock> bpBlocks = entry.getValue();
+
+ LocatedBlock[] bpBlocksArray =
+ bpBlocks.toArray(new LocatedBlock[bpBlocks.size()]);
+ RemoteMethod method = new RemoteMethod("reportBadBlocks",
+ new Class<?>[] {LocatedBlock[].class},
+ new Object[] {bpBlocksArray});
+ rpcClient.invokeSingleBlockPool(bpId, method);
+ }
+ }
+
+ @Override
+ public void unsetStoragePolicy(String src) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ @Override
+ public BlockStoragePolicy getStoragePolicy(String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public ErasureCodingPolicyInfo[] getErasureCodingPolicies()
+ throws IOException {
+ return erasureCoding.getErasureCodingPolicies();
+ }
+
+ @Override
+ public Map<String, String> getErasureCodingCodecs() throws IOException {
+ return erasureCoding.getErasureCodingCodecs();
+ }
+
+ @Override
+ public AddErasureCodingPolicyResponse[] addErasureCodingPolicies(
+ ErasureCodingPolicy[] policies) throws IOException {
+ return erasureCoding.addErasureCodingPolicies(policies);
+ }
+
+ @Override
+ public void removeErasureCodingPolicy(String ecPolicyName)
+ throws IOException {
+ erasureCoding.removeErasureCodingPolicy(ecPolicyName);
+ }
+
+ @Override
+ public void disableErasureCodingPolicy(String ecPolicyName)
+ throws IOException {
+ erasureCoding.disableErasureCodingPolicy(ecPolicyName);
+ }
+
+ @Override
+ public void enableErasureCodingPolicy(String ecPolicyName)
+ throws IOException {
+ erasureCoding.enableErasureCodingPolicy(ecPolicyName);
+ }
+
+ @Override
+ public ErasureCodingPolicy getErasureCodingPolicy(String src)
+ throws IOException {
+ return erasureCoding.getErasureCodingPolicy(src);
+ }
+
+ @Override
+ public void setErasureCodingPolicy(String src, String ecPolicyName)
+ throws IOException {
+ erasureCoding.setErasureCodingPolicy(src, ecPolicyName);
+ }
+
+ @Override
+ public void unsetErasureCodingPolicy(String src) throws IOException {
+ erasureCoding.unsetErasureCodingPolicy(src);
+ }
+
+ @Override
+ public ECBlockGroupStats getECBlockGroupStats() throws IOException {
+ return erasureCoding.getECBlockGroupStats();
+ }
+
+ @Override
+ public ReplicatedBlockStats getReplicatedBlockStats() throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Deprecated
+ @Override
+ public BatchedRemoteIterator.BatchedEntries<OpenFileEntry> listOpenFiles(long prevId)
+ throws IOException {
+ return listOpenFiles(prevId, EnumSet.of(OpenFilesIterator.OpenFilesType.ALL_OPEN_FILES),
+ OpenFilesIterator.FILTER_PATH_DEFAULT);
+ }
+
+ @Override
+ public BatchedRemoteIterator.BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
+ EnumSet<OpenFilesIterator.OpenFilesType> openFilesTypes, String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.READ, false);
+ return null;
+ }
+
+ @Override
+ public void satisfyStoragePolicy(String path) throws IOException {
+ rpcServer.checkOperation(NameNode.OperationCategory.WRITE, false);
+ }
+
+ /**
+ * Determines combinations of eligible src/dst locations for a rename. A
+ * rename cannot change the namespace. Renames are only allowed if there is an
+ * eligible dst location in the same namespace as the source.
+ *
+ * @param srcLocations List of all potential source destinations where the
+ * path may be located. On return this list is trimmed to include
+ * only the paths that have corresponding destinations in the same
+ * namespace.
+ * @param dst The destination path
+ * @return A map of all eligible source namespaces and their corresponding
+ * replacement value.
+ * @throws IOException If the dst paths could not be determined.
+ */
+ private RemoteParam getRenameDestinations(
+ final List<RemoteLocation> srcLocations, final String dst)
+ throws IOException {
+
+ final List<RemoteLocation> dstLocations =
+ rpcServer.getLocationsForPath(dst, true);
+ final Map<RemoteLocation, String> dstMap = new HashMap<>();
+
+ Iterator<RemoteLocation> iterator = srcLocations.iterator();
+ while (iterator.hasNext()) {
+ RemoteLocation srcLocation = iterator.next();
+ RemoteLocation eligibleDst =
+ getFirstMatchingLocation(srcLocation, dstLocations);
+ if (eligibleDst != null) {
+ // Use this dst for this source location
+ dstMap.put(srcLocation, eligibleDst.getDest());
+ } else {
+ // This src destination is not valid, remove from the source list
+ iterator.remove();
+ }
+ }
+ return new RemoteParam(dstMap);
+ }
+
+ /**
+ * Get first matching location.
+ *
+ * @param location Location we are looking for.
+ * @param locations List of locations.
+ * @return The first matchin location in the list.
+ */
+ private RemoteLocation getFirstMatchingLocation(RemoteLocation location,
+ List<RemoteLocation> locations) {
+ for (RemoteLocation loc : locations) {
+ if (loc.getNameserviceId().equals(location.getNameserviceId())) {
+ // Return first matching location
+ return loc;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Aggregate content summaries for each subcluster.
+ *
+ * @param summaries Collection of individual summaries.
+ * @return Aggregated content summary.
+ */
+ private ContentSummary aggregateContentSummary(
+ Collection<ContentSummary> summaries) {
+ if (summaries.size() == 1) {
+ return summaries.iterator().next();
+ }
+
+ long length = 0;
+ long fileCount = 0;
+ long directoryCount = 0;
+ long quota = 0;
+ long spaceConsumed = 0;
+ long spaceQuota = 0;
+
+ for (ContentSummary summary : summaries) {
+ length += summary.getLength();
+ fileCount += summary.getFileCount();
+ directoryCount += summary.getDirectoryCount();
+ quota += summary.getQuota();
+ spaceConsumed += summary.getSpaceConsumed();
+ spaceQuota += summary.getSpaceQuota();
+ }
+
+ ContentSummary ret = new ContentSummary.Builder()
+ .length(length)
+ .fileCount(fileCount)
+ .directoryCount(directoryCount)
+ .quota(quota)
+ .spaceConsumed(spaceConsumed)
+ .spaceQuota(spaceQuota)
+ .build();
+ return ret;
+ }
+
+ /**
+ * Get the file info from all the locations.
+ *
+ * @param locations Locations to check.
+ * @param method The file information method to run.
+ * @return The first file info if it's a file, the directory if it's
+ * everywhere.
+ * @throws IOException If all the locations throw an exception.
+ */
+ private HdfsFileStatus getFileInfoAll(final List<RemoteLocation> locations,
+ final RemoteMethod method) throws IOException {
+
+ // Get the file info from everybody
+ Map<RemoteLocation, HdfsFileStatus> results =
+ rpcClient.invokeConcurrent(locations, method, HdfsFileStatus.class);
+
+ // We return the first file
+ HdfsFileStatus dirStatus = null;
+ for (RemoteLocation loc : locations) {
+ HdfsFileStatus fileStatus = results.get(loc);
+ if (fileStatus != null) {
+ if (!fileStatus.isDirectory()) {
+ return fileStatus;
+ } else if (dirStatus == null) {
+ dirStatus = fileStatus;
+ }
+ }
+ }
+ return dirStatus;
+ }
+
+ /**
+ * Get the permissions for the parent of a child with given permissions.
+ * Add implicit u+wx permission for parent. This is based on
+ * @{FSDirMkdirOp#addImplicitUwx}.
+ * @param mask The permission mask of the child.
+ * @return The permission mask of the parent.
+ */
+ private static FsPermission getParentPermission(final FsPermission mask) {
+ FsPermission ret = new FsPermission(
+ mask.getUserAction().or(FsAction.WRITE_EXECUTE),
+ mask.getGroupAction(),
+ mask.getOtherAction());
+ return ret;
+ }
+
+ /**
+ * Check if a path should be in all subclusters.
+ *
+ * @param path Path to check.
+ * @return If a path should be in all subclusters.
+ */
+ private boolean isPathAll(final String path) {
+ if (subclusterResolver instanceof MountTableResolver) {
+ try {
+ MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
+ MountTable entry = mountTable.getMountPoint(path);
+ if (entry != null) {
+ return entry.isAll();
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot get mount point", e);
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Create a new file status for a mount point.
+ *
+ * @param name Name of the mount point.
+ * @param childrenNum Number of children.
+ * @param date Map with the dates.
+ * @return New HDFS file status representing a mount point.
+ */
+ private HdfsFileStatus getMountPointStatus(
+ String name, int childrenNum, long date) {
+ long modTime = date;
+ long accessTime = date;
+ FsPermission permission = FsPermission.getDirDefault();
+ String owner = this.superUser;
+ String group = this.superGroup;
+ try {
+ // TODO support users, it should be the user for the pointed folder
+ UserGroupInformation ugi = RouterRpcServer.getRemoteUser();
+ owner = ugi.getUserName();
+ group = ugi.getPrimaryGroupName();
+ } catch (IOException e) {
+ LOG.error("Cannot get the remote user: {}", e.getMessage());
+ }
+ long inodeId = 0;
+ return new HdfsFileStatus.Builder()
+ .isdir(true)
+ .mtime(modTime)
+ .atime(accessTime)
+ .perm(permission)
+ .owner(owner)
+ .group(group)
+ .symlink(new byte[0])
+ .path(DFSUtil.string2Bytes(name))
+ .fileId(inodeId)
+ .children(childrenNum)
+ .build();
+ }
+
+ /**
+ * Get the modification dates for mount points.
+ *
+ * @param path Name of the path to start checking dates from.
+ * @return Map with the modification dates for all sub-entries.
+ */
+ private Map<String, Long> getMountPointDates(String path) {
+ Map<String, Long> ret = new TreeMap<>();
+ if (subclusterResolver instanceof MountTableResolver) {
+ try {
+ final List<String> children = subclusterResolver.getMountPoints(path);
+ for (String child : children) {
+ Long modTime = getModifiedTime(ret, path, child);
+ ret.put(child, modTime);
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot get mount point", e);
+ }
+ }
+ return ret;
+ }
+
+ /**
+ * Get modified time for child. If the child is present in mount table it
+ * will return the modified time. If the child is not present but subdirs of
+ * this child are present then it will return latest modified subdir's time
+ * as modified time of the requested child.
+ *
+ * @param ret contains children and modified times.
+ * @param path Name of the path to start checking dates from.
+ * @param child child of the requested path.
+ * @return modified time.
+ */
+ private long getModifiedTime(Map<String, Long> ret, String path,
+ String child) {
+ MountTableResolver mountTable = (MountTableResolver)subclusterResolver;
+ String srcPath;
+ if (path.equals(Path.SEPARATOR)) {
+ srcPath = Path.SEPARATOR + child;
+ } else {
+ srcPath = path + Path.SEPARATOR + child;
+ }
+ Long modTime = 0L;
+ try {
+ // Get mount table entry for the srcPath
+ MountTable entry = mountTable.getMountPoint(srcPath);
+ // if srcPath is not in mount table but its subdirs are in mount
+ // table we will display latest modified subdir date/time.
+ if (entry == null) {
+ List<MountTable> entries = mountTable.getMounts(srcPath);
+ for (MountTable eachEntry : entries) {
+ // Get the latest date
+ if (ret.get(child) == null ||
+ ret.get(child) < eachEntry.getDateModified()) {
+ modTime = eachEntry.getDateModified();
+ }
+ }
+ } else {
+ modTime = entry.getDateModified();
+ }
+ } catch (IOException e) {
+ LOG.error("Cannot get mount point", e);
+ }
+ return modTime;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org