You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/27 23:26:19 UTC
[16/50] [abbrv] hadoop git commit: HDFS-8052. Move WebHdfsFileSystem
into hadoop-hdfs-client. Contributed by Haohui Mai.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
new file mode 100644
index 0000000..d28f571
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
@@ -0,0 +1,1461 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.BufferedOutputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.net.URI;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import javax.ws.rs.core.MediaType;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.DelegationTokenRenewer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.XAttrCodec;
+import org.apache.hadoop.fs.XAttrSetFlag;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.web.resources.*;
+import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
+import org.apache.hadoop.io.retry.RetryPolicy;
+import org.apache.hadoop.io.retry.RetryUtils;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager.InvalidToken;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenSelector;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/** A FileSystem for HDFS over the web. */
+public class WebHdfsFileSystem extends FileSystem
+ implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
+ public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
+ /** WebHdfs version. */
+ public static final int VERSION = 1;
+ /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
+ public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME + "/v" + VERSION;
+
+ /** Default connection factory may be overridden in tests to use smaller timeout values */
+ protected URLConnectionFactory connectionFactory;
+
+ @VisibleForTesting
+ public static final String CANT_FALLBACK_TO_INSECURE_MSG =
+ "The client is configured to only allow connecting to secure cluster";
+
+ private boolean canRefreshDelegationToken;
+
+ private UserGroupInformation ugi;
+ private URI uri;
+ private Token<?> delegationToken;
+ protected Text tokenServiceName;
+ private RetryPolicy retryPolicy = null;
+ private Path workingDir;
+ private InetSocketAddress nnAddrs[];
+ private int currentNNAddrIndex;
+ private boolean disallowFallbackToInsecureCluster;
+
+ /**
+ * Return the protocol scheme for the FileSystem.
+ * <p/>
+ *
+ * @return <code>webhdfs</code>
+ */
+ @Override
+ public String getScheme() {
+ return WebHdfsConstants.WEBHDFS_SCHEME;
+ }
+
+ /**
+ * return the underlying transport protocol (http / https).
+ */
+ protected String getTransportScheme() {
+ return "http";
+ }
+
+ protected Text getTokenKind() {
+ return WebHdfsConstants.WEBHDFS_TOKEN_KIND;
+ }
+
+ @Override
+ public synchronized void initialize(URI uri, Configuration conf
+ ) throws IOException {
+ super.initialize(uri, conf);
+ setConf(conf);
+ /** set user pattern based on configuration file */
+ UserParam.setUserPattern(conf.get(
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
+
+ connectionFactory = URLConnectionFactory
+ .newDefaultURLConnectionFactory(conf);
+
+
+ ugi = UserGroupInformation.getCurrentUser();
+ this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
+ this.nnAddrs = resolveNNAddr();
+
+ boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri);
+ boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri);
+ // In non-HA or non-logical URI case, the code needs to call
+ // getCanonicalUri() in order to handle the case where no port is
+ // specified in the URI
+ this.tokenServiceName = isLogicalUri ?
+ HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme())
+ : SecurityUtil.buildTokenService(getCanonicalUri());
+
+ if (!isHA) {
+ this.retryPolicy =
+ RetryUtils.getDefaultRetryPolicy(
+ conf,
+ HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_KEY,
+ HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT,
+ HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY,
+ HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT,
+ HdfsConstantsClient.SAFEMODE_EXCEPTION_CLASS_NAME);
+ } else {
+
+ int maxFailoverAttempts = conf.getInt(
+ HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_KEY,
+ HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_DEFAULT);
+ int maxRetryAttempts = conf.getInt(
+ HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_KEY,
+ HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_DEFAULT);
+ int failoverSleepBaseMillis = conf.getInt(
+ HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_KEY,
+ HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_DEFAULT);
+ int failoverSleepMaxMillis = conf.getInt(
+ HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_KEY,
+ HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT);
+
+ this.retryPolicy = RetryPolicies
+ .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+ maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
+ failoverSleepMaxMillis);
+ }
+
+ this.workingDir = getHomeDirectory();
+ this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
+ this.disallowFallbackToInsecureCluster = !conf.getBoolean(
+ CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
+ CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
+ this.delegationToken = null;
+ }
+
+ @Override
+ public URI getCanonicalUri() {
+ return super.getCanonicalUri();
+ }
+
+ TokenSelector<DelegationTokenIdentifier> tokenSelector =
+ new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
+
+ // the first getAuthParams() for a non-token op will either get the
+ // internal token from the ugi or lazy fetch one
+ protected synchronized Token<?> getDelegationToken() throws IOException {
+ if (canRefreshDelegationToken && delegationToken == null) {
+ Token<?> token = tokenSelector.selectToken(
+ new Text(getCanonicalServiceName()), ugi.getTokens());
+ // ugi tokens are usually indicative of a task which can't
+ // refetch tokens. even if ugi has credentials, don't attempt
+ // to get another token to match hdfs/rpc behavior
+ if (token != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Using UGI token: " + token);
+ }
+ canRefreshDelegationToken = false;
+ } else {
+ token = getDelegationToken(null);
+ if (token != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Fetched new token: " + token);
+ }
+ } else { // security is disabled
+ canRefreshDelegationToken = false;
+ }
+ }
+ setDelegationToken(token);
+ }
+ return delegationToken;
+ }
+
+ @VisibleForTesting
+ synchronized boolean replaceExpiredDelegationToken() throws IOException {
+ boolean replaced = false;
+ if (canRefreshDelegationToken) {
+ Token<?> token = getDelegationToken(null);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Replaced expired token: " + token);
+ }
+ setDelegationToken(token);
+ replaced = (token != null);
+ }
+ return replaced;
+ }
+
+ @Override
+ protected int getDefaultPort() {
+ return HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
+ }
+
+ @Override
+ public URI getUri() {
+ return this.uri;
+ }
+
+ @Override
+ protected URI canonicalizeUri(URI uri) {
+ return NetUtils.getCanonicalUri(uri, getDefaultPort());
+ }
+
+ /** @return the home directory. */
+ public static String getHomeDirectoryString(final UserGroupInformation ugi) {
+ return "/user/" + ugi.getShortUserName();
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ return makeQualified(new Path(getHomeDirectoryString(ugi)));
+ }
+
+ @Override
+ public synchronized Path getWorkingDirectory() {
+ return workingDir;
+ }
+
+ @Override
+ public synchronized void setWorkingDirectory(final Path dir) {
+ String result = makeAbsolute(dir).toUri().getPath();
+ if (!DFSUtilClient.isValidName(result)) {
+ throw new IllegalArgumentException("Invalid DFS directory name " +
+ result);
+ }
+ workingDir = makeAbsolute(dir);
+ }
+
+ private Path makeAbsolute(Path f) {
+ return f.isAbsolute()? f: new Path(workingDir, f);
+ }
+
+ static Map<?, ?> jsonParse(final HttpURLConnection c, final boolean useErrorStream
+ ) throws IOException {
+ if (c.getContentLength() == 0) {
+ return null;
+ }
+ final InputStream in = useErrorStream? c.getErrorStream(): c.getInputStream();
+ if (in == null) {
+ throw new IOException("The " + (useErrorStream? "error": "input") + " stream is null.");
+ }
+ try {
+ final String contentType = c.getContentType();
+ if (contentType != null) {
+ final MediaType parsed = MediaType.valueOf(contentType);
+ if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
+ throw new IOException("Content-Type \"" + contentType
+ + "\" is incompatible with \"" + MediaType.APPLICATION_JSON
+ + "\" (parsed=\"" + parsed + "\")");
+ }
+ }
+ ObjectMapper mapper = new ObjectMapper();
+ return mapper.reader(Map.class).readValue(in);
+ } finally {
+ in.close();
+ }
+ }
+
+ private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
+ final HttpURLConnection conn, boolean unwrapException) throws IOException {
+ final int code = conn.getResponseCode();
+ // server is demanding an authentication we don't support
+ if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
+ // match hdfs/rpc exception
+ throw new AccessControlException(conn.getResponseMessage());
+ }
+ if (code != op.getExpectedHttpResponseCode()) {
+ final Map<?, ?> m;
+ try {
+ m = jsonParse(conn, true);
+ } catch(Exception e) {
+ throw new IOException("Unexpected HTTP response: code=" + code + " != "
+ + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+ + ", message=" + conn.getResponseMessage(), e);
+ }
+
+ if (m == null) {
+ throw new IOException("Unexpected HTTP response: code=" + code + " != "
+ + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
+ + ", message=" + conn.getResponseMessage());
+ } else if (m.get(RemoteException.class.getSimpleName()) == null) {
+ return m;
+ }
+
+ IOException re = JsonUtilClient.toRemoteException(m);
+ // extract UGI-related exceptions and unwrap InvalidToken
+ // the NN mangles these exceptions but the DN does not and may need
+ // to re-fetch a token if either report the token is expired
+ if (re.getMessage() != null && re.getMessage().startsWith(
+ SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER)) {
+ String[] parts = re.getMessage().split(":\\s+", 3);
+ re = new RemoteException(parts[1], parts[2]);
+ re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
+ }
+ throw unwrapException? toIOException(re): re;
+ }
+ return null;
+ }
+
+ /**
+ * Covert an exception to an IOException.
+ *
+ * For a non-IOException, wrap it with IOException.
+ * For a RemoteException, unwrap it.
+ * For an IOException which is not a RemoteException, return it.
+ */
+ private static IOException toIOException(Exception e) {
+ if (!(e instanceof IOException)) {
+ return new IOException(e);
+ }
+
+ final IOException ioe = (IOException)e;
+ if (!(ioe instanceof RemoteException)) {
+ return ioe;
+ }
+
+ return ((RemoteException)ioe).unwrapRemoteException();
+ }
+
+ private synchronized InetSocketAddress getCurrentNNAddr() {
+ return nnAddrs[currentNNAddrIndex];
+ }
+
+ /**
+ * Reset the appropriate state to gracefully fail over to another name node
+ */
+ private synchronized void resetStateToFailOver() {
+ currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
+ }
+
+ /**
+ * Return a URL pointing to given path on the namenode.
+ *
+ * @param path to obtain the URL for
+ * @param query string to append to the path
+ * @return namenode URL referring to the given path
+ * @throws IOException on error constructing the URL
+ */
+ private URL getNamenodeURL(String path, String query) throws IOException {
+ InetSocketAddress nnAddr = getCurrentNNAddr();
+ final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
+ nnAddr.getPort(), path + '?' + query);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("url=" + url);
+ }
+ return url;
+ }
+
+ Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
+ List<Param<?,?>> authParams = Lists.newArrayList();
+ // Skip adding delegation token for token operations because these
+ // operations require authentication.
+ Token<?> token = null;
+ if (!op.getRequireAuth()) {
+ token = getDelegationToken();
+ }
+ if (token != null) {
+ authParams.add(new DelegationParam(token.encodeToUrlString()));
+ } else {
+ UserGroupInformation userUgi = ugi;
+ UserGroupInformation realUgi = userUgi.getRealUser();
+ if (realUgi != null) { // proxy user
+ authParams.add(new DoAsParam(userUgi.getShortUserName()));
+ userUgi = realUgi;
+ }
+ authParams.add(new UserParam(userUgi.getShortUserName()));
+ }
+ return authParams.toArray(new Param<?,?>[0]);
+ }
+
+ URL toUrl(final HttpOpParam.Op op, final Path fspath,
+ final Param<?,?>... parameters) throws IOException {
+ //initialize URI path and query
+ final String path = PATH_PREFIX
+ + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath());
+ final String query = op.toQueryString()
+ + Param.toSortedString("&", getAuthParameters(op))
+ + Param.toSortedString("&", parameters);
+ final URL url = getNamenodeURL(path, query);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("url=" + url);
+ }
+ return url;
+ }
+
+ /**
+ * This class is for initialing a HTTP connection, connecting to server,
+ * obtaining a response, and also handling retry on failures.
+ */
+ abstract class AbstractRunner<T> {
+ abstract protected URL getUrl() throws IOException;
+
+ protected final HttpOpParam.Op op;
+ private final boolean redirected;
+ protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
+
+ private boolean checkRetry;
+
+ protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
+ this.op = op;
+ this.redirected = redirected;
+ }
+
+ T run() throws IOException {
+ UserGroupInformation connectUgi = ugi.getRealUser();
+ if (connectUgi == null) {
+ connectUgi = ugi;
+ }
+ if (op.getRequireAuth()) {
+ connectUgi.checkTGTAndReloginFromKeytab();
+ }
+ try {
+ // the entire lifecycle of the connection must be run inside the
+ // doAs to ensure authentication is performed correctly
+ return connectUgi.doAs(
+ new PrivilegedExceptionAction<T>() {
+ @Override
+ public T run() throws IOException {
+ return runWithRetry();
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ /**
+ * Two-step requests redirected to a DN
+ *
+ * Create/Append:
+ * Step 1) Submit a Http request with neither auto-redirect nor data.
+ * Step 2) Submit another Http request with the URL from the Location header with data.
+ *
+ * The reason of having two-step create/append is for preventing clients to
+ * send out the data before the redirect. This issue is addressed by the
+ * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
+ * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
+ * and Java 6 http client), which do not correctly implement "Expect:
+ * 100-continue". The two-step create/append is a temporary workaround for
+ * the software library bugs.
+ *
+ * Open/Checksum
+ * Also implements two-step connects for other operations redirected to
+ * a DN such as open and checksum
+ */
+ private HttpURLConnection connect(URL url) throws IOException {
+ //redirect hostname and port
+ String redirectHost = null;
+
+
+ // resolve redirects for a DN operation unless already resolved
+ if (op.getRedirect() && !redirected) {
+ final HttpOpParam.Op redirectOp =
+ HttpOpParam.TemporaryRedirectOp.valueOf(op);
+ final HttpURLConnection conn = connect(redirectOp, url);
+ // application level proxy like httpfs might not issue a redirect
+ if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
+ return conn;
+ }
+ try {
+ validateResponse(redirectOp, conn, false);
+ url = new URL(conn.getHeaderField("Location"));
+ redirectHost = url.getHost() + ":" + url.getPort();
+ } finally {
+ conn.disconnect();
+ }
+ }
+ try {
+ return connect(op, url);
+ } catch (IOException ioe) {
+ if (redirectHost != null) {
+ if (excludeDatanodes.getValue() != null) {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
+ + excludeDatanodes.getValue());
+ } else {
+ excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
+ }
+ }
+ throw ioe;
+ }
+ }
+
+ private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
+ throws IOException {
+ final HttpURLConnection conn =
+ (HttpURLConnection)connectionFactory.openConnection(url);
+ final boolean doOutput = op.getDoOutput();
+ conn.setRequestMethod(op.getType().toString());
+ conn.setInstanceFollowRedirects(false);
+ switch (op.getType()) {
+ // if not sending a message body for a POST or PUT operation, need
+ // to ensure the server/proxy knows this
+ case POST:
+ case PUT: {
+ conn.setDoOutput(true);
+ if (!doOutput) {
+ // explicitly setting content-length to 0 won't do spnego!!
+ // opening and closing the stream will send "Content-Length: 0"
+ conn.getOutputStream().close();
+ } else {
+ conn.setRequestProperty("Content-Type",
+ MediaType.APPLICATION_OCTET_STREAM);
+ conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
+ }
+ break;
+ }
+ default: {
+ conn.setDoOutput(doOutput);
+ break;
+ }
+ }
+ conn.connect();
+ return conn;
+ }
+
+ private T runWithRetry() throws IOException {
+ /**
+ * Do the real work.
+ *
+ * There are three cases that the code inside the loop can throw an
+ * IOException:
+ *
+ * <ul>
+ * <li>The connection has failed (e.g., ConnectException,
+ * @see FailoverOnNetworkExceptionRetry for more details)</li>
+ * <li>The namenode enters the standby state (i.e., StandbyException).</li>
+ * <li>The server returns errors for the command (i.e., RemoteException)</li>
+ * </ul>
+ *
+ * The call to shouldRetry() will conduct the retry policy. The policy
+ * examines the exception and swallows it if it decides to rerun the work.
+ */
+ for(int retry = 0; ; retry++) {
+ checkRetry = !redirected;
+ final URL url = getUrl();
+ try {
+ final HttpURLConnection conn = connect(url);
+ // output streams will validate on close
+ if (!op.getDoOutput()) {
+ validateResponse(op, conn, false);
+ }
+ return getResponse(conn);
+ } catch (AccessControlException ace) {
+ // no retries for auth failures
+ throw ace;
+ } catch (InvalidToken it) {
+ // try to replace the expired token with a new one. the attempt
+ // to acquire a new token must be outside this operation's retry
+ // so if it fails after its own retries, this operation fails too.
+ if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
+ throw it;
+ }
+ } catch (IOException ioe) {
+ shouldRetry(ioe, retry);
+ }
+ }
+ }
+
+ private void shouldRetry(final IOException ioe, final int retry
+ ) throws IOException {
+ InetSocketAddress nnAddr = getCurrentNNAddr();
+ if (checkRetry) {
+ try {
+ final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
+ ioe, retry, 0, true);
+
+ boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+ boolean isFailoverAndRetry =
+ a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+
+ if (isRetry || isFailoverAndRetry) {
+ LOG.info("Retrying connect to namenode: " + nnAddr
+ + ". Already tried " + retry + " time(s); retry policy is "
+ + retryPolicy + ", delay " + a.delayMillis + "ms.");
+
+ if (isFailoverAndRetry) {
+ resetStateToFailOver();
+ }
+
+ Thread.sleep(a.delayMillis);
+ return;
+ }
+ } catch(Exception e) {
+ LOG.warn("Original exception is ", ioe);
+ throw toIOException(e);
+ }
+ }
+ throw toIOException(ioe);
+ }
+
+ abstract T getResponse(HttpURLConnection conn) throws IOException;
+ }
+
+ /**
+ * Abstract base class to handle path-based operations with params
+ */
+ abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
+ private final Path fspath;
+ private final Param<?,?>[] parameters;
+
+ AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
+ Param<?,?>... parameters) {
+ super(op, false);
+ this.fspath = fspath;
+ this.parameters = parameters;
+ }
+
+ AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+ final Path fspath) {
+ super(op, false);
+ this.fspath = fspath;
+ this.parameters = parameters;
+ }
+
+ @Override
+ protected URL getUrl() throws IOException {
+ if (excludeDatanodes.getValue() != null) {
+ Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
+ System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
+ tmpParam[parameters.length] = excludeDatanodes;
+ return toUrl(op, fspath, tmpParam);
+ } else {
+ return toUrl(op, fspath, parameters);
+ }
+ }
+ }
+
+ /**
+ * Default path-based implementation expects no json response
+ */
+ class FsPathRunner extends AbstractFsPathRunner<Void> {
+ FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+
+ @Override
+ Void getResponse(HttpURLConnection conn) throws IOException {
+ return null;
+ }
+ }
+
+ /**
+ * Handle path-based operations with a json response
+ */
+ abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
+ FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
+ Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+
+ FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
+ final Path fspath) {
+ super(op, parameters, fspath);
+ }
+
+ @Override
+ final T getResponse(HttpURLConnection conn) throws IOException {
+ try {
+ final Map<?,?> json = jsonParse(conn, false);
+ if (json == null) {
+ // match exception class thrown by parser
+ throw new IllegalStateException("Missing response");
+ }
+ return decodeResponse(json);
+ } catch (IOException ioe) {
+ throw ioe;
+ } catch (Exception e) { // catch json parser errors
+ final IOException ioe =
+ new IOException("Response decoding failure: "+e.toString(), e);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(ioe);
+ }
+ throw ioe;
+ } finally {
+ conn.disconnect();
+ }
+ }
+
+ abstract T decodeResponse(Map<?,?> json) throws IOException;
+ }
+
+ /**
+ * Handle path-based operations with json boolean response
+ */
+ class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
+ FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+
+ @Override
+ Boolean decodeResponse(Map<?,?> json) throws IOException {
+ return (Boolean)json.get("boolean");
+ }
+ }
+
+ /**
+ * Handle create/append output streams
+ */
+ class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
+ private final int bufferSize;
+
+ FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
+ Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ this.bufferSize = bufferSize;
+ }
+
+ @Override
+ FSDataOutputStream getResponse(final HttpURLConnection conn)
+ throws IOException {
+ return new FSDataOutputStream(new BufferedOutputStream(
+ conn.getOutputStream(), bufferSize), statistics) {
+ @Override
+ public void close() throws IOException {
+ try {
+ super.close();
+ } finally {
+ try {
+ validateResponse(op, conn, true);
+ } finally {
+ conn.disconnect();
+ }
+ }
+ }
+ };
+ }
+ }
+
+ class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
+ FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
+ super(op, fspath, parameters);
+ }
+ @Override
+ HttpURLConnection getResponse(final HttpURLConnection conn)
+ throws IOException {
+ return conn;
+ }
+ }
+
+ /**
+ * Used by open() which tracks the resolved url itself
+ */
+ final class URLRunner extends AbstractRunner<HttpURLConnection> {
+ private final URL url;
+ @Override
+ protected URL getUrl() {
+ return url;
+ }
+
+ protected URLRunner(final HttpOpParam.Op op, final URL url, boolean redirected) {
+ super(op, redirected);
+ this.url = url;
+ }
+
+ @Override
+ HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
+ return conn;
+ }
+ }
+
+ private FsPermission applyUMask(FsPermission permission) {
+ if (permission == null) {
+ permission = FsPermission.getDefault();
+ }
+ return permission.applyUMask(FsPermission.getUMask(getConf()));
+ }
+
+ private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
+ HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
+ @Override
+ HdfsFileStatus decodeResponse(Map<?,?> json) {
+ return JsonUtilClient.toFileStatus(json, true);
+ }
+ }.run();
+ if (status == null) {
+ throw new FileNotFoundException("File does not exist: " + f);
+ }
+ return status;
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ statistics.incrementReadOps(1);
+ return makeQualified(getHdfsFileStatus(f), f);
+ }
+
+ private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
+ return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
+ f.getBlockSize(), f.getModificationTime(), f.getAccessTime(),
+ f.getPermission(), f.getOwner(), f.getGroup(),
+ f.isSymlink() ? new Path(f.getSymlink()) : null,
+ f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory()));
+ }
+
+ @Override
+ public AclStatus getAclStatus(Path f) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
+ AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
+ @Override
+ AclStatus decodeResponse(Map<?,?> json) {
+ return JsonUtilClient.toAclStatus(json);
+ }
+ }.run();
+ if (status == null) {
+ throw new FileNotFoundException("File does not exist: " + f);
+ }
+ return status;
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
+ return new FsPathBooleanRunner(op, f,
+ new PermissionParam(applyUMask(permission))
+ ).run();
+ }
+
+ /**
+ * Create a symlink pointing to the destination path.
+ */
+ public void createSymlink(Path destination, Path f, boolean createParent
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
+ new FsPathRunner(op, f,
+ new DestinationParam(makeQualified(destination).toUri().getPath()),
+ new CreateParentParam(createParent)
+ ).run();
+ }
+
+ @Override
+ public boolean rename(final Path src, final Path dst) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.RENAME;
+ return new FsPathBooleanRunner(op, src,
+ new DestinationParam(makeQualified(dst).toUri().getPath())
+ ).run();
+ }
+
+ @SuppressWarnings("deprecation")
+ @Override
+ public void rename(final Path src, final Path dst,
+ final Options.Rename... options) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.RENAME;
+ new FsPathRunner(op, src,
+ new DestinationParam(makeQualified(dst).toUri().getPath()),
+ new RenameOptionSetParam(options)
+ ).run();
+ }
+
+ @Override
+ public void setXAttr(Path p, String name, byte[] value,
+ EnumSet<XAttrSetFlag> flag) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
+ if (value != null) {
+ new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
+ XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
+ new XAttrSetFlagParam(flag)).run();
+ } else {
+ new FsPathRunner(op, p, new XAttrNameParam(name),
+ new XAttrSetFlagParam(flag)).run();
+ }
+ }
+
+ @Override
+ public byte[] getXAttr(Path p, final String name) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+ return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
+ new XAttrEncodingParam(XAttrCodec.HEX)) {
+ @Override
+ byte[] decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtilClient.getXAttr(json, name);
+ }
+ }.run();
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path p) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+ return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
+ new XAttrEncodingParam(XAttrCodec.HEX)) {
+ @Override
+ Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtilClient.toXAttrs(json);
+ }
+ }.run();
+ }
+
+ @Override
+ public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
+ throws IOException {
+ Preconditions.checkArgument(names != null && !names.isEmpty(),
+ "XAttr names cannot be null or empty.");
+ Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
+ for (int i = 0; i < parameters.length - 1; i++) {
+ parameters[i] = new XAttrNameParam(names.get(i));
+ }
+ parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
+
+ final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
+ return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
+ @Override
+ Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtilClient.toXAttrs(json);
+ }
+ }.run();
+ }
+
+ @Override
+ public List<String> listXAttrs(Path p) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
+ return new FsPathResponseRunner<List<String>>(op, p) {
+ @Override
+ List<String> decodeResponse(Map<?, ?> json) throws IOException {
+ return JsonUtilClient.toXAttrNames(json);
+ }
+ }.run();
+ }
+
+ @Override
+ public void removeXAttr(Path p, String name) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
+ new FsPathRunner(op, p, new XAttrNameParam(name)).run();
+ }
+
+ @Override
+ public void setOwner(final Path p, final String owner, final String group
+ ) throws IOException {
+ if (owner == null && group == null) {
+ throw new IOException("owner == null && group == null");
+ }
+
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
+ new FsPathRunner(op, p,
+ new OwnerParam(owner), new GroupParam(group)
+ ).run();
+ }
+
+ @Override
+ public void setPermission(final Path p, final FsPermission permission
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
+ new FsPathRunner(op, p,new PermissionParam(permission)).run();
+ }
+
+ @Override
+ public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
+ new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
+ }
+
+ @Override
+ public void removeAclEntries(Path path, List<AclEntry> aclSpec)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
+ new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
+ }
+
+ @Override
+ public void removeDefaultAcl(Path path) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
+ new FsPathRunner(op, path).run();
+ }
+
+ @Override
+ public void removeAcl(Path path) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
+ new FsPathRunner(op, path).run();
+ }
+
+ @Override
+ public void setAcl(final Path p, final List<AclEntry> aclSpec)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.SETACL;
+ new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
+ }
+
+ @Override
+ public Path createSnapshot(final Path path, final String snapshotName)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
+ Path spath = new FsPathResponseRunner<Path>(op, path,
+ new SnapshotNameParam(snapshotName)) {
+ @Override
+ Path decodeResponse(Map<?,?> json) {
+ return new Path((String) json.get(Path.class.getSimpleName()));
+ }
+ }.run();
+ return spath;
+ }
+
+ @Override
+ public void deleteSnapshot(final Path path, final String snapshotName)
+ throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
+ new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
+ }
+
+ @Override
+ public void renameSnapshot(final Path path, final String snapshotOldName,
+ final String snapshotNewName) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
+ new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
+ new SnapshotNameParam(snapshotNewName)).run();
+ }
+
+ @Override
+ public boolean setReplication(final Path p, final short replication
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
+ return new FsPathBooleanRunner(op, p,
+ new ReplicationParam(replication)
+ ).run();
+ }
+
+ @Override
+ public void setTimes(final Path p, final long mtime, final long atime
+ ) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
+ new FsPathRunner(op, p,
+ new ModificationTimeParam(mtime),
+ new AccessTimeParam(atime)
+ ).run();
+ }
+
+ @Override
+ public long getDefaultBlockSize() {
+ return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY,
+ HdfsClientConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
+ }
+
+ @Override
+ public short getDefaultReplication() {
+ return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY,
+ HdfsClientConfigKeys.DFS_REPLICATION_DEFAULT);
+ }
+
+ @Override
+ public void concat(final Path trg, final Path [] srcs) throws IOException {
+ statistics.incrementWriteOps(1);
+ final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
+ new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
+ }
+
+ @Override
+ public FSDataOutputStream create(final Path f, final FsPermission permission,
+ final boolean overwrite, final int bufferSize, final short replication,
+ final long blockSize, final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ final HttpOpParam.Op op = PutOpParam.Op.CREATE;
+ return new FsPathOutputStreamRunner(op, f, bufferSize,
+ new PermissionParam(applyUMask(permission)),
+ new OverwriteParam(overwrite),
+ new BufferSizeParam(bufferSize),
+ new ReplicationParam(replication),
+ new BlockSizeParam(blockSize)
+ ).run();
+ }
+
+ @Override
+ public FSDataOutputStream append(final Path f, final int bufferSize,
+ final Progressable progress) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ final HttpOpParam.Op op = PostOpParam.Op.APPEND;
+ return new FsPathOutputStreamRunner(op, f, bufferSize,
+ new BufferSizeParam(bufferSize)
+ ).run();
+ }
+
+ @Override
+ public boolean truncate(Path f, long newLength) throws IOException {
+ statistics.incrementWriteOps(1);
+
+ final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
+ return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
+ return new FsPathBooleanRunner(op, f,
+ new RecursiveParam(recursive)
+ ).run();
+ }
+
+ @Override
+ public FSDataInputStream open(final Path f, final int buffersize
+ ) throws IOException {
+ statistics.incrementReadOps(1);
+ final HttpOpParam.Op op = GetOpParam.Op.OPEN;
+ // use a runner so the open can recover from an invalid token
+ FsPathConnectionRunner runner =
+ new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
+ return new FSDataInputStream(new OffsetUrlInputStream(
+ new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
+ }
+
+ @Override
+ public synchronized void close() throws IOException {
+ try {
+ if (canRefreshDelegationToken && delegationToken != null) {
+ cancelDelegationToken(delegationToken);
+ }
+ } catch (IOException ioe) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Token cancel failed: " + ioe);
+ }
+ } finally {
+ super.close();
+ }
+ }
+
+ // use FsPathConnectionRunner to ensure retries for InvalidTokens
+ class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
+ private final FsPathConnectionRunner runner;
+ UnresolvedUrlOpener(FsPathConnectionRunner runner) {
+ super(null);
+ this.runner = runner;
+ }
+
+ @Override
+ protected HttpURLConnection connect(long offset, boolean resolved)
+ throws IOException {
+ assert offset == 0;
+ HttpURLConnection conn = runner.run();
+ setURL(conn.getURL());
+ return conn;
+ }
+ }
+
+ class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
+ OffsetUrlOpener(final URL url) {
+ super(url);
+ }
+
+ /** Setup offset url and connect. */
+ @Override
+ protected HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException {
+ final URL offsetUrl = offset == 0L? url
+ : new URL(url + "&" + new OffsetParam(offset));
+ return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
+ }
+ }
+
+ private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
+
+ /** Remove offset parameter, if there is any, from the url */
+ static URL removeOffsetParam(final URL url) throws MalformedURLException {
+ String query = url.getQuery();
+ if (query == null) {
+ return url;
+ }
+ final String lower = StringUtils.toLowerCase(query);
+ if (!lower.startsWith(OFFSET_PARAM_PREFIX)
+ && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
+ return url;
+ }
+
+ //rebuild query
+ StringBuilder b = null;
+ for(final StringTokenizer st = new StringTokenizer(query, "&");
+ st.hasMoreTokens();) {
+ final String token = st.nextToken();
+ if (!StringUtils.toLowerCase(token).startsWith(OFFSET_PARAM_PREFIX)) {
+ if (b == null) {
+ b = new StringBuilder("?").append(token);
+ } else {
+ b.append('&').append(token);
+ }
+ }
+ }
+ query = b == null? "": b.toString();
+
+ final String urlStr = url.toString();
+ return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
+ }
+
+ static class OffsetUrlInputStream extends ByteRangeInputStream {
+ OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
+ throws IOException {
+ super(o, r);
+ }
+
+ /** Remove offset parameter before returning the resolved url. */
+ @Override
+ protected URL getResolvedUrl(final HttpURLConnection connection
+ ) throws MalformedURLException {
+ return removeOffsetParam(connection.getURL());
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(final Path f) throws IOException {
+ statistics.incrementReadOps(1);
+
+ final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
+ return new FsPathResponseRunner<FileStatus[]>(op, f) {
+ @Override
+ FileStatus[] decodeResponse(Map<?,?> json) {
+ final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
+ final List<?> array = JsonUtilClient.getList(rootmap,
+ FileStatus.class.getSimpleName());
+
+ //convert FileStatus
+ final FileStatus[] statuses = new FileStatus[array.size()];
+ int i = 0;
+ for (Object object : array) {
+ final Map<?, ?> m = (Map<?, ?>) object;
+ statuses[i++] = makeQualified(JsonUtilClient.toFileStatus(m, false), f);
+ }
+ return statuses;
+ }
+ }.run();
+ }
+
+ @Override
+ public Token<DelegationTokenIdentifier> getDelegationToken(
+ final String renewer) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
+ Token<DelegationTokenIdentifier> token =
+ new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
+ op, null, new RenewerParam(renewer)) {
+ @Override
+ Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
+ throws IOException {
+ return JsonUtilClient.toDelegationToken(json);
+ }
+ }.run();
+ if (token != null) {
+ token.setService(tokenServiceName);
+ } else {
+ if (disallowFallbackToInsecureCluster) {
+ throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
+ }
+ }
+ return token;
+ }
+
+ @Override
+ public synchronized Token<?> getRenewToken() {
+ return delegationToken;
+ }
+
+ @Override
+ public <T extends TokenIdentifier> void setDelegationToken(
+ final Token<T> token) {
+ synchronized (this) {
+ delegationToken = token;
+ }
+ }
+
+ @Override
+ public synchronized long renewDelegationToken(final Token<?> token
+ ) throws IOException {
+ final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
+ return new FsPathResponseRunner<Long>(op, null,
+ new TokenArgumentParam(token.encodeToUrlString())) {
+ @Override
+ Long decodeResponse(Map<?,?> json) throws IOException {
+ return ((Number) json.get("long")).longValue();
+ }
+ }.run();
+ }
+
+ @Override
+ public synchronized void cancelDelegationToken(final Token<?> token
+ ) throws IOException {
+ final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
+ new FsPathRunner(op, null,
+ new TokenArgumentParam(token.encodeToUrlString())
+ ).run();
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(final FileStatus status,
+ final long offset, final long length) throws IOException {
+ if (status == null) {
+ return null;
+ }
+ return getFileBlockLocations(status.getPath(), offset, length);
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(final Path p,
+ final long offset, final long length) throws IOException {
+ statistics.incrementReadOps(1);
+
+ final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
+ return new FsPathResponseRunner<BlockLocation[]>(op, p,
+ new OffsetParam(offset), new LengthParam(length)) {
+ @Override
+ BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
+ return DFSUtilClient.locatedBlocks2Locations(
+ JsonUtilClient.toLocatedBlocks(json));
+ }
+ }.run();
+ }
+
+ @Override
+ public void access(final Path path, final FsAction mode) throws IOException {
+ final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
+ new FsPathRunner(op, path, new FsActionParam(mode)).run();
+ }
+
+ @Override
+ public ContentSummary getContentSummary(final Path p) throws IOException {
+ statistics.incrementReadOps(1);
+
+ final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
+ return new FsPathResponseRunner<ContentSummary>(op, p) {
+ @Override
+ ContentSummary decodeResponse(Map<?,?> json) {
+ return JsonUtilClient.toContentSummary(json);
+ }
+ }.run();
+ }
+
+ @Override
+ public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
+ ) throws IOException {
+ statistics.incrementReadOps(1);
+
+ final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
+ return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
+ @Override
+ MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
+ return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);
+ }
+ }.run();
+ }
+
+ /**
+ * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
+ * resolver when the URL points to an non-HA cluster. When the URL points to
+ * an HA cluster with its logical name, the resolver further resolves the
+ * logical name(i.e., the authority in the URL) into real namenode addresses.
+ */
+ private InetSocketAddress[] resolveNNAddr() throws IOException {
+ Configuration conf = getConf();
+ final String scheme = uri.getScheme();
+
+ ArrayList<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
+
+ if (!HAUtilClient.isLogicalUri(conf, uri)) {
+ InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
+ getDefaultPort());
+ ret.add(addr);
+
+ } else {
+ Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient
+ .getHaNnWebHdfsAddresses(conf, scheme);
+
+ // Extract the entry corresponding to the logical name.
+ Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
+ for (InetSocketAddress addr : addrs.values()) {
+ ret.add(addr);
+ }
+ }
+
+ InetSocketAddress[] r = new InetSocketAddress[ret.size()];
+ return ret.toArray(r);
+ }
+
+ @Override
+ public String getCanonicalServiceName() {
+ return tokenServiceName == null ? super.getCanonicalServiceName()
+ : tokenServiceName.toString();
+ }
+
+ @VisibleForTesting
+ InetSocketAddress[] getResolvedNNAddr() {
+ return nnAddrs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java
new file mode 100644
index 0000000..376d7d8
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/resources/BufferSizeParam.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web.resources;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
+
+/** Buffer size parameter. */
+public class BufferSizeParam extends IntegerParam {
+ /** Parameter name. */
+ public static final String NAME = "buffersize";
+ /** Default parameter value. */
+ public static final String DEFAULT = NULL;
+
+ private static final Domain DOMAIN = new Domain(NAME);
+
+ /**
+ * Constructor.
+ * @param value the parameter value.
+ */
+ public BufferSizeParam(final Integer value) {
+ super(DOMAIN, value, 1, null);
+ }
+
+ /**
+ * Constructor.
+ * @param str a string representation of the parameter value.
+ */
+ public BufferSizeParam(final String str) {
+ this(DOMAIN.parse(str));
+ }
+
+ @Override
+ public String getName() {
+ return NAME;
+ }
+
+ /** @return the value or, if it is null, return the default from conf. */
+ public int getValue(final Configuration conf) {
+ return getValue() != null? getValue()
+ : conf.getInt(
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
+ CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
index ff114dd..8c3cfe1 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
+++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
@@ -467,6 +467,8 @@ Release 2.8.0 - UNRELEASED
HDFS-8215. Refactor NamenodeFsck#check method. (Takanobu Asanuma
via szetszwo)
+ HDFS-8052. Move WebHdfsFileSystem into hadoop-hdfs-client. (wheat9)
+
OPTIMIZATIONS
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
index c809017..a1cd555 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStorageLocationUtil.java
@@ -48,7 +48,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
-import org.apache.htrace.Sampler;
import org.apache.htrace.Span;
import org.apache.htrace.Trace;
import org.apache.htrace.TraceScope;
@@ -296,7 +295,7 @@ class BlockStorageLocationUtil {
List<LocatedBlock> blocks,
Map<LocatedBlock, List<VolumeId>> blockVolumeIds) throws IOException {
// Construct the final return value of VolumeBlockLocation[]
- BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+ BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
List<BlockStorageLocation> volumeBlockLocs =
new ArrayList<BlockStorageLocation>(locations.length);
for (int i = 0; i < locations.length; i++) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
index 6721412..63145b0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java
@@ -917,7 +917,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
TraceScope scope = getPathTraceScope("getBlockLocations", src);
try {
LocatedBlocks blocks = getLocatedBlocks(src, start, length);
- BlockLocation[] locations = DFSUtil.locatedBlocks2Locations(blocks);
+ BlockLocation[] locations = DFSUtilClient.locatedBlocks2Locations(blocks);
HdfsBlockLocation[] hdfsLocations = new HdfsBlockLocation[locations.length];
for (int i = 0; i < locations.length; i++) {
hdfsLocations[i] = new HdfsBlockLocation(locations[i], blocks.get(i));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
index 9a754cd..1e3a5b6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java
@@ -164,7 +164,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
*/
public static final String DFS_WEBHDFS_AUTHENTICATION_FILTER_DEFAULT =
"org.apache.hadoop.hdfs.web.AuthFilter".toString();
- public static final String DFS_WEBHDFS_USER_PATTERN_KEY = "dfs.webhdfs.user.provider.user.pattern";
+ @Deprecated
+ public static final String DFS_WEBHDFS_USER_PATTERN_KEY =
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY;
+ @Deprecated
public static final String DFS_WEBHDFS_USER_PATTERN_DEFAULT =
HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT;
public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
index 078b0bb..2ec1c80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java
@@ -228,37 +228,7 @@ public class DFSUtil {
* names which contain a ":" or "//", or other non-canonical paths.
*/
public static boolean isValidName(String src) {
- // Path must be absolute.
- if (!src.startsWith(Path.SEPARATOR)) {
- return false;
- }
-
- // Check for ".." "." ":" "/"
- String[] components = StringUtils.split(src, '/');
- for (int i = 0; i < components.length; i++) {
- String element = components[i];
- if (element.equals(".") ||
- (element.indexOf(":") >= 0) ||
- (element.indexOf("/") >= 0)) {
- return false;
- }
- // ".." is allowed in path starting with /.reserved/.inodes
- if (element.equals("..")) {
- if (components.length > 4
- && components[1].equals(FSDirectory.DOT_RESERVED_STRING)
- && components[2].equals(FSDirectory.DOT_INODES_STRING)) {
- continue;
- }
- return false;
- }
- // The string may start or end with a /, but not have
- // "//" in the middle.
- if (element.isEmpty() && i != components.length - 1 &&
- i != 0) {
- return false;
- }
- }
- return true;
+ return DFSUtilClient.isValidName(src);
}
/**
@@ -329,7 +299,7 @@ public class DFSUtil {
* Converts a string to a byte array using UTF8 encoding.
*/
public static byte[] string2Bytes(String str) {
- return str.getBytes(Charsets.UTF_8);
+ return DFSUtilClient.string2Bytes(str);
}
/**
@@ -475,61 +445,6 @@ public class DFSUtil {
}
return result;
}
-
- /**
- * Convert a LocatedBlocks to BlockLocations[]
- * @param blocks a LocatedBlocks
- * @return an array of BlockLocations
- */
- public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) {
- if (blocks == null) {
- return new BlockLocation[0];
- }
- return locatedBlocks2Locations(blocks.getLocatedBlocks());
- }
-
- /**
- * Convert a List<LocatedBlock> to BlockLocation[]
- * @param blocks A List<LocatedBlock> to be converted
- * @return converted array of BlockLocation
- */
- public static BlockLocation[] locatedBlocks2Locations(List<LocatedBlock> blocks) {
- if (blocks == null) {
- return new BlockLocation[0];
- }
- int nrBlocks = blocks.size();
- BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
- if (nrBlocks == 0) {
- return blkLocations;
- }
- int idx = 0;
- for (LocatedBlock blk : blocks) {
- assert idx < nrBlocks : "Incorrect index";
- DatanodeInfo[] locations = blk.getLocations();
- String[] hosts = new String[locations.length];
- String[] xferAddrs = new String[locations.length];
- String[] racks = new String[locations.length];
- for (int hCnt = 0; hCnt < locations.length; hCnt++) {
- hosts[hCnt] = locations[hCnt].getHostName();
- xferAddrs[hCnt] = locations[hCnt].getXferAddr();
- NodeBase node = new NodeBase(xferAddrs[hCnt],
- locations[hCnt].getNetworkLocation());
- racks[hCnt] = node.toString();
- }
- DatanodeInfo[] cachedLocations = blk.getCachedLocations();
- String[] cachedHosts = new String[cachedLocations.length];
- for (int i=0; i<cachedLocations.length; i++) {
- cachedHosts[i] = cachedLocations[i].getHostName();
- }
- blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
- racks,
- blk.getStartOffset(),
- blk.getBlockSize(),
- blk.isCorrupt());
- idx++;
- }
- return blkLocations;
- }
/**
* Return configuration key of format key.suffix1.suffix2...suffixN
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
index 0da7a4d..d826755 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/NameNodeProxies.java
@@ -404,7 +404,7 @@ public class NameNodeProxies {
HdfsClientConfigKeys.Retry.POLICY_ENABLED_DEFAULT,
HdfsClientConfigKeys.Retry.POLICY_SPEC_KEY,
HdfsClientConfigKeys.Retry.POLICY_SPEC_DEFAULT,
- SafeModeException.class);
+ SafeModeException.class.getName());
final long version = RPC.getProtocolVersion(ClientNamenodeProtocolPB.class);
ClientNamenodeProtocolPB proxy = RPC.getProtocolProxy(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
index 7e602bf..23e8f57 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsLocatedFileStatus.java
@@ -25,7 +25,7 @@ import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.DFSUtilClient;
/**
* Interface that represents the over the wire information
@@ -78,6 +78,6 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
isSymlink() ? new Path(getSymlink()) : null,
(getFullPath(path)).makeQualified(
defaultUri, null), // fully-qualify path
- DFSUtil.locatedBlocks2Locations(getBlockLocations()));
+ DFSUtilClient.locatedBlocks2Locations(getBlockLocations()));
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
index a671d21..09b6b80 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.namenode.startupprogress.StartupProgress;
import org.apache.hadoop.hdfs.server.namenode.web.resources.NamenodeWebHdfsMethods;
@@ -69,8 +70,8 @@ public class NameNodeHttpServer {
private void initWebHdfs(Configuration conf) throws IOException {
// set user pattern based on configuration file
UserParam.setUserPattern(conf.get(
- DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
- DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
+ HdfsClientConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
// add authentication filter for webhdfs
final String className = conf.get(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
deleted file mode 100644
index 395c9f6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
+++ /dev/null
@@ -1,232 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.web;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import org.apache.commons.io.input.BoundedInputStream;
-import org.apache.hadoop.fs.FSInputStream;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.net.HttpHeaders;
-
-/**
- * To support HTTP byte streams, a new connection to an HTTP server needs to be
- * created each time. This class hides the complexity of those multiple
- * connections from the client. Whenever seek() is called, a new connection
- * is made on the successive read(). The normal input stream functions are
- * connected to the currently active input stream.
- */
-public abstract class ByteRangeInputStream extends FSInputStream {
-
- /**
- * This class wraps a URL and provides method to open connection.
- * It can be overridden to change how a connection is opened.
- */
- public static abstract class URLOpener {
- protected URL url;
-
- public URLOpener(URL u) {
- url = u;
- }
-
- public void setURL(URL u) {
- url = u;
- }
-
- public URL getURL() {
- return url;
- }
-
- /** Connect to server with a data offset. */
- protected abstract HttpURLConnection connect(final long offset,
- final boolean resolved) throws IOException;
- }
-
- enum StreamStatus {
- NORMAL, SEEK, CLOSED
- }
- protected InputStream in;
- protected final URLOpener originalURL;
- protected final URLOpener resolvedURL;
- protected long startPos = 0;
- protected long currentPos = 0;
- protected Long fileLength = null;
-
- StreamStatus status = StreamStatus.SEEK;
-
- /**
- * Create with the specified URLOpeners. Original url is used to open the
- * stream for the first time. Resolved url is used in subsequent requests.
- * @param o Original url
- * @param r Resolved url
- */
- public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException {
- this.originalURL = o;
- this.resolvedURL = r;
- getInputStream();
- }
-
- protected abstract URL getResolvedUrl(final HttpURLConnection connection
- ) throws IOException;
-
- @VisibleForTesting
- protected InputStream getInputStream() throws IOException {
- switch (status) {
- case NORMAL:
- break;
- case SEEK:
- if (in != null) {
- in.close();
- }
- in = openInputStream();
- status = StreamStatus.NORMAL;
- break;
- case CLOSED:
- throw new IOException("Stream closed");
- }
- return in;
- }
-
- @VisibleForTesting
- protected InputStream openInputStream() throws IOException {
- // Use the original url if no resolved url exists, eg. if
- // it's the first time a request is made.
- final boolean resolved = resolvedURL.getURL() != null;
- final URLOpener opener = resolved? resolvedURL: originalURL;
-
- final HttpURLConnection connection = opener.connect(startPos, resolved);
- resolvedURL.setURL(getResolvedUrl(connection));
-
- InputStream in = connection.getInputStream();
- final Map<String, List<String>> headers = connection.getHeaderFields();
- if (isChunkedTransferEncoding(headers)) {
- // file length is not known
- fileLength = null;
- } else {
- // for non-chunked transfer-encoding, get content-length
- final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
- if (cl == null) {
- throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
- + headers);
- }
- final long streamlength = Long.parseLong(cl);
- fileLength = startPos + streamlength;
-
- // Java has a bug with >2GB request streams. It won't bounds check
- // the reads so the transfer blocks until the server times out
- in = new BoundedInputStream(in, streamlength);
- }
-
- return in;
- }
-
- private static boolean isChunkedTransferEncoding(
- final Map<String, List<String>> headers) {
- return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
- || contains(headers, HttpHeaders.TE, "chunked");
- }
-
- /** Does the HTTP header map contain the given key, value pair? */
- private static boolean contains(final Map<String, List<String>> headers,
- final String key, final String value) {
- final List<String> values = headers.get(key);
- if (values != null) {
- for(String v : values) {
- for(final StringTokenizer t = new StringTokenizer(v, ",");
- t.hasMoreTokens(); ) {
- if (value.equalsIgnoreCase(t.nextToken())) {
- return true;
- }
- }
- }
- }
- return false;
- }
-
- private int update(final int n) throws IOException {
- if (n != -1) {
- currentPos += n;
- } else if (fileLength != null && currentPos < fileLength) {
- throw new IOException("Got EOF but currentPos = " + currentPos
- + " < filelength = " + fileLength);
- }
- return n;
- }
-
- @Override
- public int read() throws IOException {
- final int b = getInputStream().read();
- update((b == -1) ? -1 : 1);
- return b;
- }
-
- @Override
- public int read(byte b[], int off, int len) throws IOException {
- return update(getInputStream().read(b, off, len));
- }
-
- /**
- * Seek to the given offset from the start of the file.
- * The next read() will be from that location. Can't
- * seek past the end of the file.
- */
- @Override
- public void seek(long pos) throws IOException {
- if (pos != currentPos) {
- startPos = pos;
- currentPos = pos;
- if (status != StreamStatus.CLOSED) {
- status = StreamStatus.SEEK;
- }
- }
- }
-
- /**
- * Return the current offset from the start of the file
- */
- @Override
- public long getPos() throws IOException {
- return currentPos;
- }
-
- /**
- * Seeks a different copy of the data. Returns true if
- * found a new source, false otherwise.
- */
- @Override
- public boolean seekToNewSource(long targetPos) throws IOException {
- return false;
- }
-
- @Override
- public void close() throws IOException {
- if (in != null) {
- in.close();
- in = null;
- }
- status = StreamStatus.CLOSED;
- }
-}