You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by at...@apache.org on 2011/11/02 06:35:26 UTC
svn commit: r1196458 [2/9] - in
/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/bin/ src/main/java/ src/main/java/org/apache/hadoop/fs/
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/protocol/ ...
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSInputStream.java Wed Nov 2 05:34:31 2011
@@ -388,6 +388,8 @@ public class DFSInputStream extends FSIn
DatanodeInfo chosenNode = null;
int refetchToken = 1; // only need to get a new access token once
+ boolean connectFailedOnce = false;
+
while (true) {
//
// Compute desired block
@@ -409,6 +411,10 @@ public class DFSInputStream extends FSIn
accessToken,
offsetIntoBlock, blk.getNumBytes() - offsetIntoBlock,
buffersize, verifyChecksum, dfsClient.clientName);
+ if(connectFailedOnce) {
+ DFSClient.LOG.info("Successfully connected to " + targetAddr +
+ " for block " + blk.getBlockId());
+ }
return chosenNode;
} catch (IOException ex) {
if (ex instanceof InvalidBlockTokenException && refetchToken > 0) {
@@ -428,11 +434,9 @@ public class DFSInputStream extends FSIn
refetchToken--;
fetchBlockAt(target);
} else {
- DFSClient.LOG.warn("Failed to connect to " + targetAddr
- + ", add to deadNodes and continue " + ex);
- if (DFSClient.LOG.isDebugEnabled()) {
- DFSClient.LOG.debug("Connection failure ", ex);
- }
+ connectFailedOnce = true;
+ DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
+ + ", add to deadNodes and continue. " + ex, ex);
// Put chosen node into dead list, continue
addToDeadNodes(chosenNode);
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java Wed Nov 2 05:34:31 2011
@@ -1033,9 +1033,7 @@ class DFSOutputStream extends FSOutputSu
// send the request
new Sender(out).writeBlock(block, accessToken, dfsClient.clientName,
nodes, null, recoveryFlag? stage.getRecoveryStage() : stage,
- nodes.length, block.getNumBytes(), bytesSent, newGS);
- checksum.writeHeader(out);
- out.flush();
+ nodes.length, block.getNumBytes(), bytesSent, newGS, checksum);
// receive ack for connect
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java Wed Nov 2 05:34:31 2011
@@ -22,36 +22,29 @@ import static org.apache.hadoop.hdfs.DFS
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Random;
import java.util.StringTokenizer;
-import java.util.concurrent.TimeUnit;
+
+import javax.net.SocketFactory;
import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryProxy;
-import org.apache.hadoop.ipc.RPC;
-import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation;
@@ -688,84 +681,38 @@ public class DFSUtil {
/** Create a {@link NameNode} proxy */
- public static ClientProtocol createNamenode(Configuration conf) throws IOException {
+ public static ClientProtocol createNamenode(Configuration conf)
+ throws IOException {
return createNamenode(NameNode.getAddress(conf), conf);
}
/** Create a {@link NameNode} proxy */
public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
- Configuration conf) throws IOException {
- return createNamenode(nameNodeAddr, conf, UserGroupInformation.getCurrentUser());
- }
-
- /** Create a {@link NameNode} proxy */
- public static ClientProtocol createNamenode(InetSocketAddress nameNodeAddr,
- Configuration conf, UserGroupInformation ugi) throws IOException {
- return createNamenode(createRPCNamenode(nameNodeAddr, conf, ugi));
- }
-
- /** Create a {@link NameNode} proxy */
- public static ClientProtocol createRPCNamenode(InetSocketAddress nameNodeAddr,
- Configuration conf, UserGroupInformation ugi)
- throws IOException {
- return (ClientProtocol)RPC.getProxy(ClientProtocol.class,
- ClientProtocol.versionID, nameNodeAddr, ugi, conf,
- NetUtils.getSocketFactory(conf, ClientProtocol.class));
+ Configuration conf) throws IOException {
+ return createNamenode(nameNodeAddr, conf,
+ UserGroupInformation.getCurrentUser());
}
/** Create a {@link NameNode} proxy */
- public static ClientProtocol createNamenode(ClientProtocol rpcNamenode)
- throws IOException {
- RetryPolicy createPolicy = RetryPolicies.retryUpToMaximumCountWithFixedSleep(
- 5, HdfsConstants.LEASE_SOFTLIMIT_PERIOD, TimeUnit.MILLISECONDS);
-
- Map<Class<? extends Exception>,RetryPolicy> remoteExceptionToPolicyMap =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- remoteExceptionToPolicyMap.put(AlreadyBeingCreatedException.class, createPolicy);
-
- Map<Class<? extends Exception>,RetryPolicy> exceptionToPolicyMap =
- new HashMap<Class<? extends Exception>, RetryPolicy>();
- exceptionToPolicyMap.put(RemoteException.class,
- RetryPolicies.retryByRemoteException(
- RetryPolicies.TRY_ONCE_THEN_FAIL, remoteExceptionToPolicyMap));
- RetryPolicy methodPolicy = RetryPolicies.retryByException(
- RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
- Map<String,RetryPolicy> methodNameToPolicyMap = new HashMap<String,RetryPolicy>();
-
- methodNameToPolicyMap.put("create", methodPolicy);
-
- return (ClientProtocol) RetryProxy.create(ClientProtocol.class,
- rpcNamenode, methodNameToPolicyMap);
+ public static ClientProtocol createNamenode( InetSocketAddress nameNodeAddr,
+ Configuration conf, UserGroupInformation ugi) throws IOException {
+ /**
+ * Currently we have simply burnt-in support for a SINGLE
+ * protocol - protocolR23Compatible. This will be replaced
+ * by a way to pick the right protocol based on the
+ * version of the target server.
+ */
+ return new org.apache.hadoop.hdfs.protocolR23Compatible.
+ ClientNamenodeProtocolTranslatorR23(nameNodeAddr, conf, ugi);
}
/** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout,
- LocatedBlock locatedBlock)
- throws IOException {
- InetSocketAddress addr = NetUtils.createSocketAddr(
- datanodeid.getHost() + ":" + datanodeid.getIpcPort());
- if (ClientDatanodeProtocol.LOG.isDebugEnabled()) {
- ClientDatanodeProtocol.LOG.debug("ClientDatanodeProtocol addr=" + addr);
- }
-
- // Since we're creating a new UserGroupInformation here, we know that no
- // future RPC proxies will be able to re-use the same connection. And
- // usages of this proxy tend to be one-off calls.
- //
- // This is a temporary fix: callers should really achieve this by using
- // RPC.stopProxy() on the resulting object, but this is currently not
- // working in trunk. See the discussion on HDFS-1965.
- Configuration confWithNoIpcIdle = new Configuration(conf);
- confWithNoIpcIdle.setInt(CommonConfigurationKeysPublic
- .IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, 0);
-
- UserGroupInformation ticket = UserGroupInformation
- .createRemoteUser(locatedBlock.getBlock().getLocalBlock().toString());
- ticket.addToken(locatedBlock.getBlockToken());
- return RPC.getProxy(ClientDatanodeProtocol.class,
- ClientDatanodeProtocol.versionID, addr, ticket, confWithNoIpcIdle,
- NetUtils.getDefaultSocketFactory(conf), socketTimeout);
+ LocatedBlock locatedBlock) throws IOException {
+ return new org.apache.hadoop.hdfs.protocolR23Compatible.
+ ClientDatanodeProtocolTranslatorR23(datanodeid, conf, socketTimeout,
+ locatedBlock);
}
/**
@@ -776,6 +723,14 @@ public class DFSUtil {
return collection != null && collection.size() != 0;
}
+ /** Create a {@link ClientDatanodeProtocol} proxy */
+ public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
+ InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
+ SocketFactory factory) throws IOException {
+ return new org.apache.hadoop.hdfs.protocolR23Compatible.
+ ClientDatanodeProtocolTranslatorR23(addr, ticket, conf, factory);
+ }
+
/**
* Get nameservice Id for the {@link NameNode} based on namenode RPC address
* matching the local node address.
@@ -919,4 +874,14 @@ public class DFSUtil {
private interface AddressMatcher {
public boolean match(InetSocketAddress s);
}
+
+ /** Create a URI from the scheme and address */
+ public static URI createUri(String scheme, InetSocketAddress address) {
+ try {
+ return new URI(scheme, null, address.getHostName(), address.getPort(),
+ null, null, null);
+ } catch (URISyntaxException ue) {
+ throw new IllegalArgumentException(ue);
+ }
+ }
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DistributedFileSystem.java Wed Nov 2 05:34:31 2011
@@ -810,7 +810,6 @@ public class DistributedFileSystem exten
) throws IOException {
Token<DelegationTokenIdentifier> result =
dfs.getDelegationToken(renewer == null ? null : new Text(renewer));
- result.setService(new Text(getCanonicalServiceName()));
return result;
}
@@ -830,7 +829,7 @@ public class DistributedFileSystem exten
@Deprecated
public Token<DelegationTokenIdentifier> getDelegationToken(Text renewer)
throws IOException {
- return dfs.getDelegationToken(renewer);
+ return getDelegationToken(renewer.toString());
}
@Override // FileSystem
@@ -847,10 +846,15 @@ public class DistributedFileSystem exten
* @param token delegation token obtained earlier
* @return the new expiration time
* @throws IOException
+ * @deprecated Use Token.renew instead.
*/
public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
throws InvalidToken, IOException {
- return dfs.renewDelegationToken(token);
+ try {
+ return token.renew(getConf());
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Caught interrupted", ie);
+ }
}
/**
@@ -858,10 +862,15 @@ public class DistributedFileSystem exten
*
* @param token delegation token
* @throws IOException
+ * @deprecated Use Token.cancel instead.
*/
public void cancelDelegationToken(Token<DelegationTokenIdentifier> token)
throws IOException {
- dfs.cancelDelegationToken(token);
+ try {
+ token.cancel(getConf());
+ } catch (InterruptedException ie) {
+ throw new RuntimeException("Caught interrupted", ie);
+ }
}
/**
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HDFSPolicyProvider.java Wed Nov 2 05:34:31 2011
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@@ -43,12 +44,15 @@ public class HDFSPolicyProvider extends
new Service("security.inter.datanode.protocol.acl",
InterDatanodeProtocol.class),
new Service("security.namenode.protocol.acl", NamenodeProtocol.class),
- new Service("security.refresh.policy.protocol.acl",
- RefreshAuthorizationPolicyProtocol.class),
- new Service("security.refresh.user.mappings.protocol.acl",
- RefreshUserMappingsProtocol.class),
- new Service("security.get.user.mappings.protocol.acl",
- GetUserMappingsProtocol.class)
+ new Service(
+ CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_POLICY,
+ RefreshAuthorizationPolicyProtocol.class),
+ new Service(
+ CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_REFRESH_USER_MAPPINGS,
+ RefreshUserMappingsProtocol.class),
+ new Service(
+ CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_AUTHORIZATION_GET_USER_MAPPINGS,
+ GetUserMappingsProtocol.class)
};
@Override
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java Wed Nov 2 05:34:31 2011
@@ -21,7 +21,6 @@ package org.apache.hadoop.hdfs;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
-import java.lang.ref.WeakReference;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.URI;
@@ -32,9 +31,6 @@ import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.TimeZone;
-import java.util.concurrent.DelayQueue;
-import java.util.concurrent.Delayed;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@@ -49,6 +45,8 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenRenewer;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
@@ -60,6 +58,7 @@ import org.apache.hadoop.security.Securi
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ServletUtil;
import org.xml.sax.Attributes;
@@ -78,20 +77,28 @@ import org.xml.sax.helpers.XMLReaderFact
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-public class HftpFileSystem extends FileSystem {
+public class HftpFileSystem extends FileSystem
+ implements DelegationTokenRenewer.Renewable {
+ private static final DelegationTokenRenewer<HftpFileSystem> dtRenewer
+ = new DelegationTokenRenewer<HftpFileSystem>(HftpFileSystem.class);
+
static {
HttpURLConnection.setFollowRedirects(true);
+ dtRenewer.start();
}
+ public static final Text TOKEN_KIND = new Text("HFTP delegation");
+
private String nnHttpUrl;
- private URI hdfsURI;
+ private Text hdfsServiceName;
+ private URI hftpURI;
protected InetSocketAddress nnAddr;
protected UserGroupInformation ugi;
public static final String HFTP_TIMEZONE = "UTC";
public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
- private Token<DelegationTokenIdentifier> delegationToken;
- public static final String HFTP_SERVICE_NAME_KEY = "hdfs.service.host_";
+ private Token<?> delegationToken;
+ private Token<?> renewToken;
public static final SimpleDateFormat getDateFormat() {
final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
@@ -106,19 +113,23 @@ public class HftpFileSystem extends File
}
};
- private static RenewerThread renewer = new RenewerThread();
- static {
- renewer.start();
- }
-
@Override
protected int getDefaultPort() {
- return DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT;
+ return getDefaultSecurePort();
+
+ //TODO: un-comment the following once HDFS-7510 is committed.
+// return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
+// DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
+ }
+
+ protected int getDefaultSecurePort() {
+ return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
}
@Override
public String getCanonicalServiceName() {
- return SecurityUtil.buildDTServiceName(hdfsURI, getDefaultPort());
+ return SecurityUtil.buildDTServiceName(hftpURI, getDefaultPort());
}
private String buildUri(String schema, String host, int port) {
@@ -127,7 +138,6 @@ public class HftpFileSystem extends File
}
- @SuppressWarnings("unchecked")
@Override
public void initialize(final URI name, final Configuration conf)
throws IOException {
@@ -144,17 +154,21 @@ public class HftpFileSystem extends File
urlPort = conf.getInt(DFSConfigKeys.DFS_HTTPS_PORT_KEY,
DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
- nnHttpUrl =
- buildUri("https://", NetUtils.normalizeHostName(name.getHost()), urlPort);
+ String normalizedNN = NetUtils.normalizeHostName(name.getHost());
+ nnHttpUrl = buildUri("https://", normalizedNN ,urlPort);
LOG.debug("using url to get DT:" + nnHttpUrl);
+ try {
+ hftpURI = new URI(buildUri("hftp://", normalizedNN, urlPort));
+ } catch (URISyntaxException ue) {
+ throw new IOException("bad uri for hdfs", ue);
+ }
-
-
// if one uses RPC port different from the Default one,
// one should specify what is the setvice name for this delegation token
// otherwise it is hostname:RPC_PORT
- String key = HftpFileSystem.HFTP_SERVICE_NAME_KEY+
- SecurityUtil.buildDTServiceName(name, DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
+ String key = DelegationTokenSelector.SERVICE_NAME_KEY
+ + SecurityUtil.buildDTServiceName(name,
+ DFSConfigKeys.DFS_HTTPS_PORT_DEFAULT);
if(LOG.isDebugEnabled()) {
LOG.debug("Trying to find DT for " + name + " using key=" + key +
"; conf=" + conf.get(key, ""));
@@ -165,9 +179,10 @@ public class HftpFileSystem extends File
nnPort = NetUtils.createSocketAddr(nnServiceName,
NameNode.DEFAULT_PORT).getPort();
}
-
try {
- hdfsURI = new URI(buildUri("hdfs://", nnAddr.getHostName(), nnPort));
+ URI hdfsURI = new URI("hdfs://" + normalizedNN + ":" + nnPort);
+ hdfsServiceName = new Text(SecurityUtil.buildDTServiceName(hdfsURI,
+ nnPort));
} catch (URISyntaxException ue) {
throw new IOException("bad uri for hdfs", ue);
}
@@ -175,30 +190,73 @@ public class HftpFileSystem extends File
if (UserGroupInformation.isSecurityEnabled()) {
//try finding a token for this namenode (esp applicable for tasks
//using hftp). If there exists one, just set the delegationField
- String canonicalName = getCanonicalServiceName();
+ String hftpServiceName = getCanonicalServiceName();
for (Token<? extends TokenIdentifier> t : ugi.getTokens()) {
- if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(t.getKind()) &&
- t.getService().toString().equals(canonicalName)) {
- if(LOG.isDebugEnabled()) {
- LOG.debug("Found existing DT for " + name);
+ Text kind = t.getKind();
+ if (DelegationTokenIdentifier.HDFS_DELEGATION_KIND.equals(kind)) {
+ if (t.getService().equals(hdfsServiceName)) {
+ setDelegationToken(t);
+ break;
+ }
+ } else if (TOKEN_KIND.equals(kind)) {
+ if (hftpServiceName
+ .equals(normalizeService(t.getService().toString()))) {
+ setDelegationToken(t);
+ break;
}
- delegationToken = (Token<DelegationTokenIdentifier>) t;
- break;
}
}
//since we don't already have a token, go get one over https
if (delegationToken == null) {
- delegationToken =
- (Token<DelegationTokenIdentifier>) getDelegationToken(null);
- renewer.addTokenToRenew(this);
+ setDelegationToken(getDelegationToken(null));
+ dtRenewer.addRenewAction(this);
}
}
}
+
+ private String normalizeService(String service) {
+ int colonIndex = service.indexOf(':');
+ if (colonIndex == -1) {
+ throw new IllegalArgumentException("Invalid service for hftp token: " +
+ service);
+ }
+ String hostname =
+ NetUtils.normalizeHostName(service.substring(0, colonIndex));
+ String port = service.substring(colonIndex + 1);
+ return hostname + ":" + port;
+ }
+
+ //TODO: un-comment the following once HDFS-7510 is committed.
+// protected Token<DelegationTokenIdentifier> selectHftpDelegationToken() {
+// Text serviceName = SecurityUtil.buildTokenService(nnSecureAddr);
+// return hftpTokenSelector.selectToken(serviceName, ugi.getTokens());
+// }
+
+ protected Token<DelegationTokenIdentifier> selectHdfsDelegationToken() {
+ return DelegationTokenSelector.selectHdfsDelegationToken(
+ nnAddr, ugi, getConf());
+ }
@Override
- public synchronized Token<?> getDelegationToken(final String renewer) throws IOException {
+ public Token<?> getRenewToken() {
+ return renewToken;
+ }
+
+ @Override
+ public <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+ renewToken = token;
+ // emulate the 203 usage of the tokens
+ // by setting the kind and service as if they were hdfs tokens
+ delegationToken = new Token<T>(token);
+ delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+ delegationToken.setService(hdfsServiceName);
+ }
+
+ @Override
+ public synchronized Token<?> getDelegationToken(final String renewer
+ ) throws IOException {
try {
//Renew TGT if needed
ugi.reloginFromKeytab();
@@ -221,7 +279,6 @@ public class HftpFileSystem extends File
LOG.debug("Got dt for " + getUri() + ";t.service="
+t.getService());
}
- t.setService(new Text(getCanonicalServiceName()));
return t;
}
return null;
@@ -594,157 +651,43 @@ public class HftpFileSystem extends File
return cs != null? cs: super.getContentSummary(f);
}
+ @InterfaceAudience.Private
+ public static class TokenManager extends TokenRenewer {
- /**
- * An action that will renew and replace the hftp file system's delegation
- * tokens automatically.
- */
- private static class RenewAction implements Delayed {
- // when should the renew happen
- private long timestamp;
- // a weak reference to the file system so that it can be garbage collected
- private final WeakReference<HftpFileSystem> weakFs;
-
- RenewAction(long timestamp, HftpFileSystem fs) {
- this.timestamp = timestamp;
- this.weakFs = new WeakReference<HftpFileSystem>(fs);
- }
-
- /**
- * Get the delay until this event should happen.
- */
@Override
- public long getDelay(TimeUnit unit) {
- long millisLeft = timestamp - System.currentTimeMillis();
- return unit.convert(millisLeft, TimeUnit.MILLISECONDS);
+ public boolean handleKind(Text kind) {
+ return kind.equals(TOKEN_KIND);
}
- /**
- * Compare two events in the same queue.
- */
- @Override
- public int compareTo(Delayed o) {
- if (o.getClass() != RenewAction.class) {
- throw new IllegalArgumentException("Illegal comparision to non-RenewAction");
- }
- RenewAction other = (RenewAction) o;
- return timestamp < other.timestamp ? -1 :
- (timestamp == other.timestamp ? 0 : 1);
- }
-
@Override
- public int hashCode() {
- assert false : "hashCode not designed";
- return 33;
- }
- /**
- * equals
- */
- @Override
- public boolean equals(Object o) {
- if(!( o instanceof Delayed))
- return false;
-
- return compareTo((Delayed) o) == 0;
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
}
- /**
- * Set a new time for the renewal. Can only be called when the action
- * is not in the queue.
- * @param newTime the new time
- */
- public void setNewTime(long newTime) {
- timestamp = newTime;
- }
-
- /**
- * Renew or replace the delegation token for this file system.
- * @return
- * @throws IOException
- */
@SuppressWarnings("unchecked")
- public boolean renew() throws IOException, InterruptedException {
- final HftpFileSystem fs = weakFs.get();
- if (fs != null) {
- synchronized (fs) {
- fs.ugi.reloginFromKeytab();
- fs.ugi.doAs(new PrivilegedExceptionAction<Void>() {
-
- @Override
- public Void run() throws Exception {
- try {
- DelegationTokenFetcher.renewDelegationToken(fs.nnHttpUrl,
- fs.delegationToken);
- } catch (IOException ie) {
- try {
- fs.delegationToken =
- (Token<DelegationTokenIdentifier>) fs.getDelegationToken(null);
- } catch (IOException ie2) {
- throw new IOException("Can't renew or get new delegation token ",
- ie);
- }
- }
- return null;
- }
- });
- }
- }
- return fs != null;
- }
-
- public String toString() {
- StringBuilder result = new StringBuilder();
- HftpFileSystem fs = weakFs.get();
- if (fs == null) {
- return "evaporated token renew";
- }
- synchronized (fs) {
- result.append(fs.delegationToken);
- }
- result.append(" renew in ");
- result.append(getDelay(TimeUnit.SECONDS));
- result.append(" secs");
- return result.toString();
+ @Override
+ public long renew(Token<?> token,
+ Configuration conf) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ // use https to renew the token
+ return
+ DelegationTokenFetcher.renewDelegationToken
+ ("https://" + token.getService().toString(),
+ (Token<DelegationTokenIdentifier>) token);
}
- }
- /**
- * A daemon thread that waits for the next file system to renew.
- */
- private static class RenewerThread extends Thread {
- private DelayQueue<RenewAction> queue = new DelayQueue<RenewAction>();
- // wait for 95% of a day between renewals
- private static final int RENEW_CYCLE = (int) (0.95 * 24 * 60 * 60 * 1000);
-
- public RenewerThread() {
- super("HFTP Delegation Token Renewer");
- setDaemon(true);
- }
-
- public void addTokenToRenew(HftpFileSystem fs) {
- queue.add(new RenewAction(RENEW_CYCLE + System.currentTimeMillis(),fs));
- }
-
- public void run() {
- RenewAction action = null;
- while (true) {
- try {
- action = queue.take();
- if (action.renew()) {
- action.setNewTime(RENEW_CYCLE + System.currentTimeMillis());
- queue.add(action);
- }
- action = null;
- } catch (InterruptedException ie) {
- return;
- } catch (Exception ie) {
- if (action != null) {
- LOG.warn("Failure to renew token " + action, ie);
- } else {
- LOG.warn("Failure in renew queue", ie);
- }
- }
- }
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancel(Token<?> token,
+ Configuration conf) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ // use https to cancel the token
+ DelegationTokenFetcher.cancelDelegationToken
+ ("https://" + token.getService().toString(),
+ (Token<DelegationTokenIdentifier>) token);
}
+
}
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader.java Wed Nov 2 05:34:31 2011
@@ -33,10 +33,13 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.fs.FSInputChecker;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.PacketHeader;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientReadStatusProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumResponseProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ReadOpChecksumInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
@@ -408,11 +411,14 @@ public class RemoteBlockReader extends F
BlockOpResponseProto status = BlockOpResponseProto.parseFrom(
vintPrefixed(in));
checkSuccess(status, sock, block, file);
- DataChecksum checksum = DataChecksum.newDataChecksum( in );
+ ReadOpChecksumInfoProto checksumInfo =
+ status.getReadOpChecksumInfo();
+ DataChecksum checksum = DataTransferProtoUtil.fromProto(
+ checksumInfo.getChecksum());
//Warning when we get CHECKSUM_NULL?
// Read the first chunk offset.
- long firstChunkOffset = in.readLong();
+ long firstChunkOffset = checksumInfo.getChunkOffset();
if ( firstChunkOffset < 0 || firstChunkOffset > startOffset ||
firstChunkOffset >= (startOffset + checksum.getBytesPerChecksum())) {
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientDatanodeProtocol.java Wed Nov 2 05:34:31 2011
@@ -37,10 +37,29 @@ import org.apache.hadoop.security.token.
serverPrincipal = DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY)
@TokenInfo(BlockTokenSelector.class)
public interface ClientDatanodeProtocol extends VersionedProtocol {
- public static final Log LOG = LogFactory.getLog(ClientDatanodeProtocol.class);
-
/**
+ * Until version 9, this class ClientDatanodeProtocol served as both
+ * the client interface to the DN AND the RPC protocol used to
+ * communicate with the NN.
+ *
+ * Post version 10 (release 23 of Hadoop), the protocol is implemented in
+ * {@literal ../protocolR23Compatible/ClientDatanodeWireProtocol}
+ *
+ * This class is used by both the DFSClient and the
+ * DN server side to insulate from the protocol serialization.
+ *
+ * If you are adding/changing DN's interface then you need to
+ * change both this class and ALSO
+ * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientDatanodeWireProtocol}.
+ * These changes need to be done in a compatible fashion as described in
+ * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}
+ *
+ * The log of historical changes can be retrieved from the svn).
* 9: Added deleteBlockPool method
+ *
+ * 9 is the last version id when this class was used for protocols
+ * serialization. DO not update this version any further.
+ * Changes are recorded in R23 classes.
*/
public static final long versionID = 9L;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Wed Nov 2 05:34:31 2011
@@ -65,10 +65,28 @@ import org.apache.hadoop.hdfs.security.t
public interface ClientProtocol extends VersionedProtocol {
/**
- * Compared to the previous version the following changes have been introduced:
- * (Only the latest change is reflected.
+ * Until version 69, this class ClientProtocol served as both
+ * the client interface to the NN AND the RPC protocol used to
+ * communicate with the NN.
+ *
+ * Post version 70 (release 23 of Hadoop), the protocol is implemented in
+ * {@literal ../protocolR23Compatible/ClientNamenodeWireProtocol}
+ *
+ * This class is used by both the DFSClient and the
+ * NN server side to insulate from the protocol serialization.
+ *
+ * If you are adding/changing NN's interface then you need to
+ * change both this class and ALSO
+ * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}.
+ * These changes need to be done in a compatible fashion as described in
+ * {@link org.apache.hadoop.hdfs.protocolR23Compatible.ClientNamenodeWireProtocol}
+ *
* The log of historical changes can be retrieved from the svn).
* 69: Eliminate overloaded method names.
+ *
+ * 69L is the last version id when this class was used for protocols
+ * serialization. DO not update this version any further.
+ * Changes are recorded in R23 classes.
*/
public static final long versionID = 69L;
@@ -373,11 +391,8 @@ public interface ClientProtocol extends
* @return true if successful, or false if the old name does not exist
* or if the new name already belongs to the namespace.
*
- * @throws IOException an I/O error occurred
- *
- * @deprecated Use {@link #rename(String, String, Options.Rename...)} instead.
+ * @throws IOException an I/O error occurred
*/
- @Deprecated
public boolean rename(String src, String dst)
throws UnresolvedLinkException, IOException;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeID.java Wed Nov 2 05:34:31 2011
@@ -24,7 +24,6 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DeprecatedUTF8;
import org.apache.hadoop.io.WritableComparable;
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java Wed Nov 2 05:34:31 2011
@@ -75,6 +75,13 @@ public class DatanodeInfo extends Datano
public String toString() {
return value;
}
+
+ public static AdminStates fromValue(final String value) {
+ for (AdminStates as : AdminStates.values()) {
+ if (as.value.equals(value)) return as;
+ }
+ return NORMAL;
+ }
}
@Nullable
@@ -110,11 +117,20 @@ public class DatanodeInfo extends Datano
this.adminState = null;
}
- protected DatanodeInfo(DatanodeID nodeID, String location, String hostName) {
+ public DatanodeInfo(DatanodeID nodeID, String location, String hostName) {
this(nodeID);
this.location = location;
this.hostName = hostName;
}
+
+ public DatanodeInfo(DatanodeID nodeID, String location, String hostName,
+ final long capacity, final long dfsUsed, final long remaining,
+ final long blockPoolUsed, final long lastUpdate, final int xceiverCount,
+ final AdminStates adminState) {
+ this(nodeID.getName(), nodeID.getStorageID(), nodeID.getInfoPort(), nodeID
+ .getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed, lastUpdate,
+ xceiverCount, location, hostName, adminState);
+ }
/** Constructor */
public DatanodeInfo(final String name, final String storageID,
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstants.java Wed Nov 2 05:34:31 2011
@@ -26,11 +26,20 @@ import org.apache.hadoop.hdfs.HdfsConfig
*
************************************/
@InterfaceAudience.Private
-public final class HdfsConstants {
+public class HdfsConstants {
/* Hidden constructor */
- private HdfsConstants() {
+ protected HdfsConstants() {
}
-
+
+ /**
+ * HDFS Protocol Names:
+ */
+ public static final String CLIENT_NAMENODE_PROTOCOL_NAME =
+ "org.apache.hadoop.hdfs.protocol.ClientProtocol";
+ public static final String CLIENT_DATANODE_PROTOCOL_NAME =
+ "org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol";
+
+
public static int MIN_BLOCKS_FOR_WRITE = 5;
// Long that indicates "leave current quota unchanged"
@@ -63,7 +72,7 @@ public final class HdfsConstants {
public static final int BYTES_IN_INTEGER = Integer.SIZE / Byte.SIZE;
// SafeMode actions
- public enum SafeModeAction {
+ public static enum SafeModeAction {
SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET;
}
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsFileStatus.java Wed Nov 2 05:34:31 2011
@@ -241,6 +241,10 @@ public class HdfsFileStatus implements W
final public String getSymlink() {
return DFSUtil.bytes2String(symlink);
}
+
+ final public byte[] getSymlinkInBytes() {
+ return symlink;
+ }
//////////////////////////////////////////////////
// Writable
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsProtoUtil.java Wed Nov 2 05:34:31 2011
@@ -87,6 +87,7 @@ public abstract class HdfsProtoUtil {
.setName(dni.getName())
.setStorageID(dni.getStorageID())
.setInfoPort(dni.getInfoPort())
+ .setIpcPort(dni.getIpcPort())
.build();
}
@@ -95,7 +96,7 @@ public abstract class HdfsProtoUtil {
idProto.getName(),
idProto.getStorageID(),
idProto.getInfoPort(),
- -1); // ipc port not serialized in writables either
+ idProto.getIpcPort());
}
//// DatanodeInfo ////
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/LocatedBlock.java Wed Nov 2 05:34:31 2011
@@ -54,6 +54,11 @@ public class LocatedBlock implements Wri
public LocatedBlock() {
this(new ExtendedBlock(), new DatanodeInfo[0], 0L, false);
}
+
+
+ public LocatedBlock(ExtendedBlock eb) {
+ this(eb, new DatanodeInfo[0], 0L, false);
+ }
public LocatedBlock(String bpid, Block b, DatanodeInfo[] locs) {
this(new ExtendedBlock(bpid, b), locs, -1, false); // startOffset is unknown
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtoUtil.java Wed Nov 2 05:34:31 2011
@@ -23,10 +23,16 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BaseHeaderProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto.ChecksumType;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
+
+import com.google.common.collect.BiMap;
+import com.google.common.collect.ImmutableBiMap;
/**
@@ -35,8 +41,20 @@ import org.apache.hadoop.security.token.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
-abstract class DataTransferProtoUtil {
+public abstract class DataTransferProtoUtil {
+
+ /**
+ * Map between the internal DataChecksum identifiers and the protobuf-
+ * generated identifiers on the wire.
+ */
+ static BiMap<Integer, ChecksumProto.ChecksumType> checksumTypeMap =
+ ImmutableBiMap.<Integer, ChecksumProto.ChecksumType>builder()
+ .put(DataChecksum.CHECKSUM_CRC32, ChecksumProto.ChecksumType.CRC32)
+ .put(DataChecksum.CHECKSUM_CRC32C, ChecksumProto.ChecksumType.CRC32C)
+ .put(DataChecksum.CHECKSUM_NULL, ChecksumProto.ChecksumType.NULL)
+ .build();
+
static BlockConstructionStage fromProto(
OpWriteBlockProto.BlockConstructionStage stage) {
return BlockConstructionStage.valueOf(BlockConstructionStage.class,
@@ -49,6 +67,28 @@ abstract class DataTransferProtoUtil {
stage.name());
}
+ public static ChecksumProto toProto(DataChecksum checksum) {
+ ChecksumType type = checksumTypeMap.get(checksum.getChecksumType());
+ if (type == null) {
+ throw new IllegalArgumentException(
+ "Can't convert checksum to protobuf: " + checksum);
+ }
+
+ return ChecksumProto.newBuilder()
+ .setBytesPerChecksum(checksum.getBytesPerChecksum())
+ .setType(type)
+ .build();
+ }
+
+ public static DataChecksum fromProto(ChecksumProto proto) {
+ if (proto == null) return null;
+
+ int bytesPerChecksum = proto.getBytesPerChecksum();
+ int type = checksumTypeMap.inverse().get(proto.getType());
+
+ return DataChecksum.newDataChecksum(type, bytesPerChecksum);
+ }
+
static ClientOperationHeaderProto buildClientHeader(ExtendedBlock blk,
String client, Token<BlockTokenIdentifier> blockToken) {
ClientOperationHeaderProto header =
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/DataTransferProtocol.java Wed Nov 2 05:34:31 2011
@@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocol.D
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
/**
* Transfer data to/from datanode using a streaming protocol.
@@ -84,7 +85,8 @@ public interface DataTransferProtocol {
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
- final long latestGenerationStamp) throws IOException;
+ final long latestGenerationStamp,
+ final DataChecksum requestedChecksum) throws IOException;
/**
* Transfer a block to another datanode.
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Receiver.java Wed Nov 2 05:34:31 2011
@@ -103,7 +103,8 @@ public abstract class Receiver implement
fromProto(proto.getStage()),
proto.getPipelineSize(),
proto.getMinBytesRcvd(), proto.getMaxBytesRcvd(),
- proto.getLatestGenerationStamp());
+ proto.getLatestGenerationStamp(),
+ fromProto(proto.getRequestedChecksum()));
}
/** Receive {@link Op#TRANSFER_BLOCK} */
Modified: hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java?rev=1196458&r1=1196457&r2=1196458&view=diff
==============================================================================
--- hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java (original)
+++ hadoop/common/branches/HDFS-1623/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/datatransfer/Sender.java Wed Nov 2 05:34:31 2011
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ClientOperationHeaderProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpBlockChecksumProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpCopyBlockProto;
@@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.protocol.p
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.OpWriteBlockProto;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.DataChecksum;
import com.google.protobuf.Message;
@@ -93,10 +95,14 @@ public class Sender implements DataTrans
final int pipelineSize,
final long minBytesRcvd,
final long maxBytesRcvd,
- final long latestGenerationStamp) throws IOException {
+ final long latestGenerationStamp,
+ DataChecksum requestedChecksum) throws IOException {
ClientOperationHeaderProto header = DataTransferProtoUtil.buildClientHeader(
blk, clientName, blockToken);
+ ChecksumProto checksumProto =
+ DataTransferProtoUtil.toProto(requestedChecksum);
+
OpWriteBlockProto.Builder proto = OpWriteBlockProto.newBuilder()
.setHeader(header)
.addAllTargets(toProtos(targets, 1))
@@ -104,7 +110,8 @@ public class Sender implements DataTrans
.setPipelineSize(pipelineSize)
.setMinBytesRcvd(minBytesRcvd)
.setMaxBytesRcvd(maxBytesRcvd)
- .setLatestGenerationStamp(latestGenerationStamp);
+ .setLatestGenerationStamp(latestGenerationStamp)
+ .setRequestedChecksum(checksumProto);
if (source != null) {
proto.setSource(toProto(source));