You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/27 23:26:20 UTC
[17/50] [abbrv] hadoop git commit: HDFS-8052. Move WebHdfsFileSystem
into hadoop-hdfs-client. Contributed by Haohui Mai.
HDFS-8052. Move WebHdfsFileSystem into hadoop-hdfs-client. Contributed by Haohui Mai.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/d1b933b2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/d1b933b2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/d1b933b2
Branch: refs/heads/YARN-2928
Commit: d1b933b266e42b47e4c5378d6d3193de3c46d7a2
Parents: 45ccd91
Author: Haohui Mai <wh...@apache.org>
Authored: Thu Apr 23 17:33:05 2015 -0700
Committer: Zhijie Shen <zj...@apache.org>
Committed: Mon Apr 27 14:18:48 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/io/retry/RetryUtils.java | 4 +-
.../org/apache/hadoop/hdfs/DFSUtilClient.java | 109 ++
.../hdfs/client/HdfsClientConfigKeys.java | 1 +
.../hdfs/protocol/HdfsConstantsClient.java | 4 +
.../hadoop/hdfs/web/ByteRangeInputStream.java | 232 +++
.../apache/hadoop/hdfs/web/JsonUtilClient.java | 485 ++++++
.../hdfs/web/KerberosUgiAuthenticator.java | 45 +
.../hadoop/hdfs/web/SWebHdfsFileSystem.java | 44 +
.../org/apache/hadoop/hdfs/web/TokenAspect.java | 179 +++
.../hadoop/hdfs/web/URLConnectionFactory.java | 187 +++
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 1461 +++++++++++++++++
.../hdfs/web/resources/BufferSizeParam.java | 60 +
hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 +
.../hadoop/hdfs/BlockStorageLocationUtil.java | 3 +-
.../java/org/apache/hadoop/hdfs/DFSClient.java | 2 +-
.../org/apache/hadoop/hdfs/DFSConfigKeys.java | 5 +-
.../java/org/apache/hadoop/hdfs/DFSUtil.java | 89 +-
.../org/apache/hadoop/hdfs/NameNodeProxies.java | 2 +-
.../hdfs/protocol/HdfsLocatedFileStatus.java | 4 +-
.../server/namenode/NameNodeHttpServer.java | 5 +-
.../hadoop/hdfs/web/ByteRangeInputStream.java | 232 ---
.../apache/hadoop/hdfs/web/JsonUtilClient.java | 484 ------
.../hdfs/web/KerberosUgiAuthenticator.java | 45 -
.../hadoop/hdfs/web/SWebHdfsFileSystem.java | 44 -
.../org/apache/hadoop/hdfs/web/TokenAspect.java | 179 ---
.../hadoop/hdfs/web/URLConnectionFactory.java | 187 ---
.../hadoop/hdfs/web/WebHdfsFileSystem.java | 1463 ------------------
.../hdfs/web/resources/BufferSizeParam.java | 60 -
.../org/apache/hadoop/hdfs/TestDFSUtil.java | 4 +-
.../org/apache/hadoop/hdfs/web/TestWebHDFS.java | 4 +-
30 files changed, 2830 insertions(+), 2795 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
index e6f4519..b2e115f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java
@@ -60,7 +60,7 @@ public class RetryUtils {
boolean defaultRetryPolicyEnabled,
String retryPolicySpecKey,
String defaultRetryPolicySpec,
- final Class<? extends Exception> remoteExceptionToRetry
+ final String remoteExceptionToRetry
) {
final RetryPolicy multipleLinearRandomRetry =
@@ -94,7 +94,7 @@ public class RetryUtils {
final RetryPolicy p;
if (e instanceof RemoteException) {
final RemoteException re = (RemoteException)e;
- p = remoteExceptionToRetry.getName().equals(re.getClassName())?
+ p = remoteExceptionToRetry.equals(re.getClassName())?
multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
} else if (e instanceof IOException || e instanceof ServiceException) {
p = multipleLinearRandomRetry;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
index 84fb12c..97d3408 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSUtilClient.java
@@ -19,10 +19,17 @@ package org.apache.hadoop.hdfs;
import com.google.common.base.Joiner;
import com.google.common.collect.Maps;
+import org.apache.commons.io.Charsets;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.BlockLocation;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.web.WebHdfsConstants;
import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +38,7 @@ import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Collections;
+import java.util.List;
import java.util.Map;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
@@ -40,6 +48,13 @@ public class DFSUtilClient {
private static final Logger LOG = LoggerFactory.getLogger(
DFSUtilClient.class);
/**
+ * Converts a string to a byte array using UTF8 encoding.
+ */
+ public static byte[] string2Bytes(String str) {
+ return str.getBytes(Charsets.UTF_8);
+ }
+
+ /**
* Converts a byte array to a string using UTF8 encoding.
*/
public static String bytes2String(byte[] bytes) {
@@ -114,6 +129,62 @@ public class DFSUtilClient {
}
/**
+ * Convert a LocatedBlocks to BlockLocations[]
+ * @param blocks a LocatedBlocks
+ * @return an array of BlockLocations
+ */
+ public static BlockLocation[] locatedBlocks2Locations(LocatedBlocks blocks) {
+ if (blocks == null) {
+ return new BlockLocation[0];
+ }
+ return locatedBlocks2Locations(blocks.getLocatedBlocks());
+ }
+
+ /**
+ * Convert a List<LocatedBlock> to BlockLocation[]
+ * @param blocks A List<LocatedBlock> to be converted
+ * @return converted array of BlockLocation
+ */
+ public static BlockLocation[] locatedBlocks2Locations(
+ List<LocatedBlock> blocks) {
+ if (blocks == null) {
+ return new BlockLocation[0];
+ }
+ int nrBlocks = blocks.size();
+ BlockLocation[] blkLocations = new BlockLocation[nrBlocks];
+ if (nrBlocks == 0) {
+ return blkLocations;
+ }
+ int idx = 0;
+ for (LocatedBlock blk : blocks) {
+ assert idx < nrBlocks : "Incorrect index";
+ DatanodeInfo[] locations = blk.getLocations();
+ String[] hosts = new String[locations.length];
+ String[] xferAddrs = new String[locations.length];
+ String[] racks = new String[locations.length];
+ for (int hCnt = 0; hCnt < locations.length; hCnt++) {
+ hosts[hCnt] = locations[hCnt].getHostName();
+ xferAddrs[hCnt] = locations[hCnt].getXferAddr();
+ NodeBase node = new NodeBase(xferAddrs[hCnt],
+ locations[hCnt].getNetworkLocation());
+ racks[hCnt] = node.toString();
+ }
+ DatanodeInfo[] cachedLocations = blk.getCachedLocations();
+ String[] cachedHosts = new String[cachedLocations.length];
+ for (int i=0; i<cachedLocations.length; i++) {
+ cachedHosts[i] = cachedLocations[i].getHostName();
+ }
+ blkLocations[idx] = new BlockLocation(xferAddrs, hosts, cachedHosts,
+ racks,
+ blk.getStartOffset(),
+ blk.getBlockSize(),
+ blk.isCorrupt());
+ idx++;
+ }
+ return blkLocations;
+ }
+
+ /**
* Decode a specific range of bytes of the given byte array to a string
* using UTF8.
*
@@ -234,4 +305,42 @@ public class DFSUtilClient {
}
return value;
}
+
+ /**
+ * Whether the pathname is valid. Currently prohibits relative paths,
+ * names which contain a ":" or "//", or other non-canonical paths.
+ */
+ public static boolean isValidName(String src) {
+ // Path must be absolute.
+ if (!src.startsWith(Path.SEPARATOR)) {
+ return false;
+ }
+
+ // Check for ".." "." ":" "/"
+ String[] components = StringUtils.split(src, '/');
+ for (int i = 0; i < components.length; i++) {
+ String element = components[i];
+ if (element.equals(".") ||
+ (element.contains(":")) ||
+ (element.contains("/"))) {
+ return false;
+ }
+ // ".." is allowed in path starting with /.reserved/.inodes
+ if (element.equals("..")) {
+ if (components.length > 4
+ && components[1].equals(".reserved")
+ && components[2].equals(".inodes")) {
+ continue;
+ }
+ return false;
+ }
+ // The string may start or end with a /, but not have
+ // "//" in the middle.
+ if (element.isEmpty() && i != components.length - 1 &&
+ i != 0) {
+ return false;
+ }
+ }
+ return true;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index d11922d..86c8a87 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -26,6 +26,7 @@ public interface HdfsClientConfigKeys {
long DFS_BLOCK_SIZE_DEFAULT = 128*1024*1024;
String DFS_REPLICATION_KEY = "dfs.replication";
short DFS_REPLICATION_DEFAULT = 3;
+ String DFS_WEBHDFS_USER_PATTERN_KEY = "dfs.webhdfs.user.provider.user.pattern";
String DFS_WEBHDFS_USER_PATTERN_DEFAULT = "^[A-Za-z_][A-Za-z0-9._-]*[$]?$";
String DFS_WEBHDFS_ACL_PERMISSION_PATTERN_DEFAULT =
"^(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?(,(default:)?(user|group|mask|other):[[A-Za-z_][A-Za-z0-9._-]]*:([rwx-]{3})?)*$";
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
index ab4310e..00f07e8 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/HdfsConstantsClient.java
@@ -38,4 +38,8 @@ public interface HdfsConstantsClient {
* URI.
*/
String HA_DT_SERVICE_PREFIX = "ha-";
+ // The name of the SafeModeException. FileSystem should retry if it sees
+ // the below exception in RPC
+ String SAFEMODE_EXCEPTION_CLASS_NAME = "org.apache.hadoop.hdfs.server" +
+ ".namenode.SafeModeException";
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
new file mode 100644
index 0000000..395c9f6
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
@@ -0,0 +1,232 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HttpHeaders;
+
+/**
+ * To support HTTP byte streams, a new connection to an HTTP server needs to be
+ * created each time. This class hides the complexity of those multiple
+ * connections from the client. Whenever seek() is called, a new connection
+ * is made on the successive read(). The normal input stream functions are
+ * connected to the currently active input stream.
+ */
+public abstract class ByteRangeInputStream extends FSInputStream {
+
+ /**
+ * This class wraps a URL and provides method to open connection.
+ * It can be overridden to change how a connection is opened.
+ */
+ public static abstract class URLOpener {
+ protected URL url;
+
+ public URLOpener(URL u) {
+ url = u;
+ }
+
+ public void setURL(URL u) {
+ url = u;
+ }
+
+ public URL getURL() {
+ return url;
+ }
+
+ /** Connect to server with a data offset. */
+ protected abstract HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException;
+ }
+
+ enum StreamStatus {
+ NORMAL, SEEK, CLOSED
+ }
+ protected InputStream in;
+ protected final URLOpener originalURL;
+ protected final URLOpener resolvedURL;
+ protected long startPos = 0;
+ protected long currentPos = 0;
+ protected Long fileLength = null;
+
+ StreamStatus status = StreamStatus.SEEK;
+
+ /**
+ * Create with the specified URLOpeners. Original url is used to open the
+ * stream for the first time. Resolved url is used in subsequent requests.
+ * @param o Original url
+ * @param r Resolved url
+ */
+ public ByteRangeInputStream(URLOpener o, URLOpener r) throws IOException {
+ this.originalURL = o;
+ this.resolvedURL = r;
+ getInputStream();
+ }
+
+ protected abstract URL getResolvedUrl(final HttpURLConnection connection
+ ) throws IOException;
+
+ @VisibleForTesting
+ protected InputStream getInputStream() throws IOException {
+ switch (status) {
+ case NORMAL:
+ break;
+ case SEEK:
+ if (in != null) {
+ in.close();
+ }
+ in = openInputStream();
+ status = StreamStatus.NORMAL;
+ break;
+ case CLOSED:
+ throw new IOException("Stream closed");
+ }
+ return in;
+ }
+
+ @VisibleForTesting
+ protected InputStream openInputStream() throws IOException {
+ // Use the original url if no resolved url exists, eg. if
+ // it's the first time a request is made.
+ final boolean resolved = resolvedURL.getURL() != null;
+ final URLOpener opener = resolved? resolvedURL: originalURL;
+
+ final HttpURLConnection connection = opener.connect(startPos, resolved);
+ resolvedURL.setURL(getResolvedUrl(connection));
+
+ InputStream in = connection.getInputStream();
+ final Map<String, List<String>> headers = connection.getHeaderFields();
+ if (isChunkedTransferEncoding(headers)) {
+ // file length is not known
+ fileLength = null;
+ } else {
+ // for non-chunked transfer-encoding, get content-length
+ final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+ if (cl == null) {
+ throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+ + headers);
+ }
+ final long streamlength = Long.parseLong(cl);
+ fileLength = startPos + streamlength;
+
+ // Java has a bug with >2GB request streams. It won't bounds check
+ // the reads so the transfer blocks until the server times out
+ in = new BoundedInputStream(in, streamlength);
+ }
+
+ return in;
+ }
+
+ private static boolean isChunkedTransferEncoding(
+ final Map<String, List<String>> headers) {
+ return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
+ || contains(headers, HttpHeaders.TE, "chunked");
+ }
+
+ /** Does the HTTP header map contain the given key, value pair? */
+ private static boolean contains(final Map<String, List<String>> headers,
+ final String key, final String value) {
+ final List<String> values = headers.get(key);
+ if (values != null) {
+ for(String v : values) {
+ for(final StringTokenizer t = new StringTokenizer(v, ",");
+ t.hasMoreTokens(); ) {
+ if (value.equalsIgnoreCase(t.nextToken())) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private int update(final int n) throws IOException {
+ if (n != -1) {
+ currentPos += n;
+ } else if (fileLength != null && currentPos < fileLength) {
+ throw new IOException("Got EOF but currentPos = " + currentPos
+ + " < filelength = " + fileLength);
+ }
+ return n;
+ }
+
+ @Override
+ public int read() throws IOException {
+ final int b = getInputStream().read();
+ update((b == -1) ? -1 : 1);
+ return b;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ return update(getInputStream().read(b, off, len));
+ }
+
+ /**
+ * Seek to the given offset from the start of the file.
+ * The next read() will be from that location. Can't
+ * seek past the end of the file.
+ */
+ @Override
+ public void seek(long pos) throws IOException {
+ if (pos != currentPos) {
+ startPos = pos;
+ currentPos = pos;
+ if (status != StreamStatus.CLOSED) {
+ status = StreamStatus.SEEK;
+ }
+ }
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ */
+ @Override
+ public long getPos() throws IOException {
+ return currentPos;
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ */
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ status = StreamStatus.CLOSED;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
new file mode 100644
index 0000000..e263a0a
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
@@ -0,0 +1,485 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
+import org.apache.hadoop.fs.XAttrCodec;
+import org.apache.hadoop.fs.permission.AclEntry;
+import org.apache.hadoop.fs.permission.AclStatus;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSUtilClient;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
+import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
+import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
+import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.util.DataChecksum;
+import org.apache.hadoop.util.StringUtils;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.ObjectReader;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+class JsonUtilClient {
+ static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
+
+ /** Convert a Json map to a RemoteException. */
+ static RemoteException toRemoteException(final Map<?, ?> json) {
+ final Map<?, ?> m = (Map<?, ?>)json.get(RemoteException.class.getSimpleName());
+ final String message = (String)m.get("message");
+ final String javaClassName = (String)m.get("javaClassName");
+ return new RemoteException(javaClassName, message);
+ }
+
+ /** Convert a Json map to a Token. */
+ static Token<? extends TokenIdentifier> toToken(
+ final Map<?, ?> m) throws IOException {
+ if (m == null) {
+ return null;
+ }
+
+ final Token<DelegationTokenIdentifier> token
+ = new Token<>();
+ token.decodeFromUrlString((String)m.get("urlString"));
+ return token;
+ }
+
+ /** Convert a Json map to a Token of BlockTokenIdentifier. */
+ @SuppressWarnings("unchecked")
+ static Token<BlockTokenIdentifier> toBlockToken(
+ final Map<?, ?> m) throws IOException {
+ return (Token<BlockTokenIdentifier>)toToken(m);
+ }
+
+ /** Convert a string to a FsPermission object. */
+ static FsPermission toFsPermission(
+ final String s, Boolean aclBit, Boolean encBit) {
+ FsPermission perm = new FsPermission(Short.parseShort(s, 8));
+ final boolean aBit = (aclBit != null) ? aclBit : false;
+ final boolean eBit = (encBit != null) ? encBit : false;
+ if (aBit || eBit) {
+ return new FsPermissionExtension(perm, aBit, eBit);
+ } else {
+ return perm;
+ }
+ }
+
+ /** Convert a Json map to a HdfsFileStatus object. */
+ static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
+ if (json == null) {
+ return null;
+ }
+
+ final Map<?, ?> m = includesType ?
+ (Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
+ final String localName = (String) m.get("pathSuffix");
+ final WebHdfsConstants.PathType type = WebHdfsConstants.PathType.valueOf((String) m.get("type"));
+ final byte[] symlink = type != WebHdfsConstants.PathType.SYMLINK? null
+ : DFSUtilClient.string2Bytes((String) m.get("symlink"));
+
+ final long len = ((Number) m.get("length")).longValue();
+ final String owner = (String) m.get("owner");
+ final String group = (String) m.get("group");
+ final FsPermission permission = toFsPermission((String) m.get("permission"),
+ (Boolean) m.get("aclBit"),
+ (Boolean) m.get("encBit"));
+ final long aTime = ((Number) m.get("accessTime")).longValue();
+ final long mTime = ((Number) m.get("modificationTime")).longValue();
+ final long blockSize = ((Number) m.get("blockSize")).longValue();
+ final short replication = ((Number) m.get("replication")).shortValue();
+ final long fileId = m.containsKey("fileId") ?
+ ((Number) m.get("fileId")).longValue() : HdfsConstantsClient.GRANDFATHER_INODE_ID;
+ final int childrenNum = getInt(m, "childrenNum", -1);
+ final byte storagePolicy = m.containsKey("storagePolicy") ?
+ (byte) ((Number) m.get("storagePolicy")).longValue() :
+ HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
+ return new HdfsFileStatus(len, type == WebHdfsConstants.PathType.DIRECTORY, replication,
+ blockSize, mTime, aTime, permission, owner, group,
+ symlink, DFSUtilClient.string2Bytes(localName),
+ fileId, childrenNum, null,
+ storagePolicy);
+ }
+
+ /** Convert a Json map to an ExtendedBlock object. */
+ static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
+ if (m == null) {
+ return null;
+ }
+
+ final String blockPoolId = (String)m.get("blockPoolId");
+ final long blockId = ((Number) m.get("blockId")).longValue();
+ final long numBytes = ((Number) m.get("numBytes")).longValue();
+ final long generationStamp =
+ ((Number) m.get("generationStamp")).longValue();
+ return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
+ }
+
+ static int getInt(Map<?, ?> m, String key, final int defaultValue) {
+ Object value = m.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return ((Number) value).intValue();
+ }
+
+ static long getLong(Map<?, ?> m, String key, final long defaultValue) {
+ Object value = m.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return ((Number) value).longValue();
+ }
+
+ static String getString(
+ Map<?, ?> m, String key, final String defaultValue) {
+ Object value = m.get(key);
+ if (value == null) {
+ return defaultValue;
+ }
+ return (String) value;
+ }
+
+ static List<?> getList(Map<?, ?> m, String key) {
+ Object list = m.get(key);
+ if (list instanceof List<?>) {
+ return (List<?>) list;
+ } else {
+ return null;
+ }
+ }
+
+ /** Convert a Json map to an DatanodeInfo object. */
+ static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
+ throws IOException {
+ if (m == null) {
+ return null;
+ }
+
+ // ipAddr and xferPort are the critical fields for accessing data.
+ // If any one of the two is missing, an exception needs to be thrown.
+
+ // Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
+ // of ipAddr and xferPort.
+ String ipAddr = getString(m, "ipAddr", null);
+ int xferPort = getInt(m, "xferPort", -1);
+ if (ipAddr == null) {
+ String name = getString(m, "name", null);
+ if (name != null) {
+ int colonIdx = name.indexOf(':');
+ if (colonIdx > 0) {
+ ipAddr = name.substring(0, colonIdx);
+ xferPort = Integer.parseInt(name.substring(colonIdx +1));
+ } else {
+ throw new IOException(
+ "Invalid value in server response: name=[" + name + "]");
+ }
+ } else {
+ throw new IOException(
+ "Missing both 'ipAddr' and 'name' in server response.");
+ }
+ // ipAddr is non-null & non-empty string at this point.
+ }
+
+ // Check the validity of xferPort.
+ if (xferPort == -1) {
+ throw new IOException(
+ "Invalid or missing 'xferPort' in server response.");
+ }
+
+ // TODO: Fix storageID
+ return new DatanodeInfo(
+ ipAddr,
+ (String)m.get("hostName"),
+ (String)m.get("storageID"),
+ xferPort,
+ ((Number) m.get("infoPort")).intValue(),
+ getInt(m, "infoSecurePort", 0),
+ ((Number) m.get("ipcPort")).intValue(),
+
+ getLong(m, "capacity", 0l),
+ getLong(m, "dfsUsed", 0l),
+ getLong(m, "remaining", 0l),
+ getLong(m, "blockPoolUsed", 0l),
+ getLong(m, "cacheCapacity", 0l),
+ getLong(m, "cacheUsed", 0l),
+ getLong(m, "lastUpdate", 0l),
+ getLong(m, "lastUpdateMonotonic", 0l),
+ getInt(m, "xceiverCount", 0),
+ getString(m, "networkLocation", ""),
+ DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
+ }
+
+ /** Convert an Object[] to a DatanodeInfo[]. */
+ static DatanodeInfo[] toDatanodeInfoArray(final List<?> objects)
+ throws IOException {
+ if (objects == null) {
+ return null;
+ } else if (objects.isEmpty()) {
+ return EMPTY_DATANODE_INFO_ARRAY;
+ } else {
+ final DatanodeInfo[] array = new DatanodeInfo[objects.size()];
+ int i = 0;
+ for (Object object : objects) {
+ array[i++] = toDatanodeInfo((Map<?, ?>) object);
+ }
+ return array;
+ }
+ }
+
+ /** Convert a Json map to LocatedBlock. */
+ static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
+ if (m == null) {
+ return null;
+ }
+
+ final ExtendedBlock b = toExtendedBlock((Map<?, ?>)m.get("block"));
+ final DatanodeInfo[] locations = toDatanodeInfoArray(
+ getList(m, "locations"));
+ final long startOffset = ((Number) m.get("startOffset")).longValue();
+ final boolean isCorrupt = (Boolean)m.get("isCorrupt");
+ final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
+ getList(m, "cachedLocations"));
+
+ final LocatedBlock locatedblock = new LocatedBlock(b, locations,
+ null, null, startOffset, isCorrupt, cachedLocations);
+ locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
+ return locatedblock;
+ }
+
+ /** Convert an List of Object to a List of LocatedBlock. */
+ static List<LocatedBlock> toLocatedBlockList(
+ final List<?> objects) throws IOException {
+ if (objects == null) {
+ return null;
+ } else if (objects.isEmpty()) {
+ return Collections.emptyList();
+ } else {
+ final List<LocatedBlock> list = new ArrayList<>(objects.size());
+ for (Object object : objects) {
+ list.add(toLocatedBlock((Map<?, ?>) object));
+ }
+ return list;
+ }
+ }
+
+ /** Convert a Json map to a ContentSummary. */
+ static ContentSummary toContentSummary(final Map<?, ?> json) {
+ if (json == null) {
+ return null;
+ }
+
+ final Map<?, ?> m = (Map<?, ?>)json.get(ContentSummary.class.getSimpleName());
+ final long length = ((Number) m.get("length")).longValue();
+ final long fileCount = ((Number) m.get("fileCount")).longValue();
+ final long directoryCount = ((Number) m.get("directoryCount")).longValue();
+ final long quota = ((Number) m.get("quota")).longValue();
+ final long spaceConsumed = ((Number) m.get("spaceConsumed")).longValue();
+ final long spaceQuota = ((Number) m.get("spaceQuota")).longValue();
+
+ return new ContentSummary.Builder().length(length).fileCount(fileCount).
+ directoryCount(directoryCount).quota(quota).spaceConsumed(spaceConsumed).
+ spaceQuota(spaceQuota).build();
+ }
+
+ /** Convert a Json map to a MD5MD5CRC32FileChecksum. */
+ static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
+ final Map<?, ?> json) throws IOException {
+ if (json == null) {
+ return null;
+ }
+
+ final Map<?, ?> m = (Map<?, ?>)json.get(FileChecksum.class.getSimpleName());
+ final String algorithm = (String)m.get("algorithm");
+ final int length = ((Number) m.get("length")).intValue();
+ final byte[] bytes = StringUtils.hexStringToByte((String) m.get("bytes"));
+
+ final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
+ final DataChecksum.Type crcType =
+ MD5MD5CRC32FileChecksum.getCrcTypeFromAlgorithmName(algorithm);
+ final MD5MD5CRC32FileChecksum checksum;
+
+ // Recreate what DFSClient would have returned.
+ switch(crcType) {
+ case CRC32:
+ checksum = new MD5MD5CRC32GzipFileChecksum();
+ break;
+ case CRC32C:
+ checksum = new MD5MD5CRC32CastagnoliFileChecksum();
+ break;
+ default:
+ throw new IOException("Unknown algorithm: " + algorithm);
+ }
+ checksum.readFields(in);
+
+ //check algorithm name
+ if (!checksum.getAlgorithmName().equals(algorithm)) {
+ throw new IOException("Algorithm not matched. Expected " + algorithm
+ + ", Received " + checksum.getAlgorithmName());
+ }
+ //check length
+ if (length != checksum.getLength()) {
+ throw new IOException("Length not matched: length=" + length
+ + ", checksum.getLength()=" + checksum.getLength());
+ }
+
+ return checksum;
+ }
+
+ /** Convert a Json map to a AclStatus object. */
+ static AclStatus toAclStatus(final Map<?, ?> json) {
+ if (json == null) {
+ return null;
+ }
+
+ final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
+
+ AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
+ aclStatusBuilder.owner((String) m.get("owner"));
+ aclStatusBuilder.group((String) m.get("group"));
+ aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
+ String permString = (String) m.get("permission");
+ if (permString != null) {
+ final FsPermission permission = toFsPermission(permString,
+ (Boolean) m.get("aclBit"), (Boolean) m.get("encBit"));
+ aclStatusBuilder.setPermission(permission);
+ }
+ final List<?> entries = (List<?>) m.get("entries");
+
+ List<AclEntry> aclEntryList = new ArrayList<>();
+ for (Object entry : entries) {
+ AclEntry aclEntry = AclEntry.parseAclEntry((String) entry, true);
+ aclEntryList.add(aclEntry);
+ }
+ aclStatusBuilder.addEntries(aclEntryList);
+ return aclStatusBuilder.build();
+ }
+
+ static byte[] getXAttr(final Map<?, ?> json, final String name)
+ throws IOException {
+ if (json == null) {
+ return null;
+ }
+
+ Map<String, byte[]> xAttrs = toXAttrs(json);
+ if (xAttrs != null) {
+ return xAttrs.get(name);
+ }
+
+ return null;
+ }
+
+ static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
+ throws IOException {
+ if (json == null) {
+ return null;
+ }
+ return toXAttrMap(getList(json, "XAttrs"));
+ }
+
+ static List<String> toXAttrNames(final Map<?, ?> json)
+ throws IOException {
+ if (json == null) {
+ return null;
+ }
+
+ final String namesInJson = (String) json.get("XAttrNames");
+ ObjectReader reader = new ObjectMapper().reader(List.class);
+ final List<Object> xattrs = reader.readValue(namesInJson);
+ final List<String> names =
+ Lists.newArrayListWithCapacity(json.keySet().size());
+
+ for (Object xattr : xattrs) {
+ names.add((String) xattr);
+ }
+ return names;
+ }
+
+ static Map<String, byte[]> toXAttrMap(final List<?> objects)
+ throws IOException {
+ if (objects == null) {
+ return null;
+ } else if (objects.isEmpty()) {
+ return Maps.newHashMap();
+ } else {
+ final Map<String, byte[]> xAttrs = Maps.newHashMap();
+ for (Object object : objects) {
+ Map<?, ?> m = (Map<?, ?>) object;
+ String name = (String) m.get("name");
+ String value = (String) m.get("value");
+ xAttrs.put(name, decodeXAttrValue(value));
+ }
+ return xAttrs;
+ }
+ }
+
+ static byte[] decodeXAttrValue(String value) throws IOException {
+ if (value != null) {
+ return XAttrCodec.decodeValue(value);
+ } else {
+ return new byte[0];
+ }
+ }
+
+ /** Convert a Json map to a Token of DelegationTokenIdentifier. */
+ @SuppressWarnings("unchecked")
+ static Token<DelegationTokenIdentifier> toDelegationToken(
+ final Map<?, ?> json) throws IOException {
+ final Map<?, ?> m = (Map<?, ?>)json.get(Token.class.getSimpleName());
+ return (Token<DelegationTokenIdentifier>) toToken(m);
+ }
+
+ /** Convert a Json map to LocatedBlock. */
+ static LocatedBlocks toLocatedBlocks(
+ final Map<?, ?> json) throws IOException {
+ if (json == null) {
+ return null;
+ }
+
+ final Map<?, ?> m = (Map<?, ?>)json.get(LocatedBlocks.class.getSimpleName());
+ final long fileLength = ((Number) m.get("fileLength")).longValue();
+ final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
+ final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
+ getList(m, "locatedBlocks"));
+ final LocatedBlock lastLocatedBlock = toLocatedBlock(
+ (Map<?, ?>) m.get("lastLocatedBlock"));
+ final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
+ return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
+ lastLocatedBlock, isLastBlockComplete, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
new file mode 100644
index 0000000..b8ea951
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.Authenticator;
+import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
+import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
+
+/**
+ * Use UserGroupInformation as a fallback authenticator
+ * if the server does not use Kerberos SPNEGO HTTP authentication.
+ */
+public class KerberosUgiAuthenticator extends KerberosAuthenticator {
+ @Override
+ protected Authenticator getFallBackAuthenticator() {
+ return new PseudoAuthenticator() {
+ @Override
+ protected String getUserName() {
+ try {
+ return UserGroupInformation.getLoginUser().getUserName();
+ } catch (IOException e) {
+ throw new SecurityException("Failed to obtain current username", e);
+ }
+ }
+ };
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
new file mode 100644
index 0000000..e0c22ce
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
+import org.apache.hadoop.io.Text;
+
+public class SWebHdfsFileSystem extends WebHdfsFileSystem {
+
+ @Override
+ public String getScheme() {
+ return WebHdfsConstants.SWEBHDFS_SCHEME;
+ }
+
+ @Override
+ protected String getTransportScheme() {
+ return "https";
+ }
+
+ @Override
+ protected Text getTokenKind() {
+ return WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
+ }
+
+ @Override
+ protected int getDefaultPort() {
+ return HdfsClientConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
new file mode 100644
index 0000000..bc3eb4b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.URI;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DelegationTokenRenewer;
+import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.HAUtilClient;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This class implements the aspects that relate to delegation tokens for all
+ * HTTP-based file system.
+ */
+final class TokenAspect<T extends FileSystem & Renewable> {
+ @InterfaceAudience.Private
+ public static class TokenManager extends TokenRenewer {
+
+ @Override
+ public void cancel(Token<?> token, Configuration conf) throws IOException {
+ getInstance(token, conf).cancelDelegationToken(token);
+ }
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return kind.equals(WebHdfsConstants.WEBHDFS_TOKEN_KIND)
+ || kind.equals(WebHdfsConstants.SWEBHDFS_TOKEN_KIND);
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
+ }
+
+ @Override
+ public long renew(Token<?> token, Configuration conf) throws IOException {
+ return getInstance(token, conf).renewDelegationToken(token);
+ }
+
+ private TokenManagementDelegator getInstance(Token<?> token,
+ Configuration conf)
+ throws IOException {
+ final URI uri;
+ final String scheme = getSchemeByKind(token.getKind());
+ if (HAUtilClient.isTokenForLogicalUri(token)) {
+ uri = HAUtilClient.getServiceUriFromToken(scheme, token);
+ } else {
+ final InetSocketAddress address = SecurityUtil.getTokenServiceAddr
+ (token);
+ uri = URI.create(scheme + "://" + NetUtils.getHostPortString(address));
+ }
+ return (TokenManagementDelegator) FileSystem.get(uri, conf);
+ }
+
+ private static String getSchemeByKind(Text kind) {
+ if (kind.equals(WebHdfsConstants.WEBHDFS_TOKEN_KIND)) {
+ return WebHdfsConstants.WEBHDFS_SCHEME;
+ } else if (kind.equals(WebHdfsConstants.SWEBHDFS_TOKEN_KIND)) {
+ return WebHdfsConstants.SWEBHDFS_SCHEME;
+ } else {
+ throw new IllegalArgumentException("Unsupported scheme");
+ }
+ }
+ }
+
+ private static class DTSelecorByKind extends
+ AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
+ public DTSelecorByKind(final Text kind) {
+ super(kind);
+ }
+ }
+
+ /**
+ * Callbacks for token management
+ */
+ interface TokenManagementDelegator {
+ void cancelDelegationToken(final Token<?> token) throws IOException;
+ long renewDelegationToken(final Token<?> token) throws IOException;
+ }
+
+ private DelegationTokenRenewer.RenewAction<?> action;
+ private DelegationTokenRenewer dtRenewer = null;
+ private final DTSelecorByKind dtSelector;
+ private final T fs;
+ private boolean hasInitedToken;
+ private final Log LOG;
+ private final Text serviceName;
+
+ TokenAspect(T fs, final Text serviceName, final Text kind) {
+ this.LOG = LogFactory.getLog(fs.getClass());
+ this.fs = fs;
+ this.dtSelector = new DTSelecorByKind(kind);
+ this.serviceName = serviceName;
+ }
+
+ synchronized void ensureTokenInitialized() throws IOException {
+ // we haven't inited yet, or we used to have a token but it expired
+ if (!hasInitedToken || (action != null && !action.isValid())) {
+ //since we don't already have a token, go get one
+ Token<?> token = fs.getDelegationToken(null);
+ // security might be disabled
+ if (token != null) {
+ fs.setDelegationToken(token);
+ addRenewAction(fs);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Created new DT for " + token.getService());
+ }
+ }
+ hasInitedToken = true;
+ }
+ }
+
+ public synchronized void reset() {
+ hasInitedToken = false;
+ }
+
+ synchronized void initDelegationToken(UserGroupInformation ugi) {
+ Token<?> token = selectDelegationToken(ugi);
+ if (token != null) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Found existing DT for " + token.getService());
+ }
+ fs.setDelegationToken(token);
+ hasInitedToken = true;
+ }
+ }
+
+ synchronized void removeRenewAction() throws IOException {
+ if (dtRenewer != null) {
+ dtRenewer.removeRenewAction(fs);
+ }
+ }
+
+ @VisibleForTesting
+ Token<DelegationTokenIdentifier> selectDelegationToken(
+ UserGroupInformation ugi) {
+ return dtSelector.selectToken(serviceName, ugi.getTokens());
+ }
+
+ private synchronized void addRenewAction(final T webhdfs) {
+ if (dtRenewer == null) {
+ dtRenewer = DelegationTokenRenewer.getInstance();
+ }
+
+ action = dtRenewer.addRenewAction(webhdfs);
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/d1b933b2/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
new file mode 100644
index 0000000..e330adf
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
@@ -0,0 +1,187 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.net.URLConnection;
+import java.security.GeneralSecurityException;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.SSLSocketFactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
+import org.apache.hadoop.security.authentication.client.AuthenticationException;
+import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
+import org.apache.hadoop.security.ssl.SSLFactory;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * Utilities for handling URLs
+ */
+@InterfaceAudience.LimitedPrivate({ "HDFS" })
+@InterfaceStability.Unstable
+public class URLConnectionFactory {
+ private static final Log LOG = LogFactory.getLog(URLConnectionFactory.class);
+
+ /**
+ * Timeout for socket connects and reads
+ */
+ public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
+ private final ConnectionConfigurator connConfigurator;
+
+ private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ URLConnectionFactory.setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
+ return conn;
+ }
+ };
+
+ /**
+ * The URLConnectionFactory that sets the default timeout and it only trusts
+ * Java's SSL certificates.
+ */
+ public static final URLConnectionFactory DEFAULT_SYSTEM_CONNECTION_FACTORY = new URLConnectionFactory(
+ DEFAULT_TIMEOUT_CONN_CONFIGURATOR);
+
+ /**
+ * Construct a new URLConnectionFactory based on the configuration. It will
+ * try to load SSL certificates when it is specified.
+ */
+ public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
+ ConnectionConfigurator conn = null;
+ try {
+ conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
+ } catch (Exception e) {
+ LOG.debug(
+ "Cannot load customized ssl related configuration. Fallback to system-generic settings.",
+ e);
+ conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
+ }
+ return new URLConnectionFactory(conn);
+ }
+
+ @VisibleForTesting
+ URLConnectionFactory(ConnectionConfigurator connConfigurator) {
+ this.connConfigurator = connConfigurator;
+ }
+
+ /**
+ * Create a new ConnectionConfigurator for SSL connections
+ */
+ private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
+ Configuration conf) throws IOException, GeneralSecurityException {
+ final SSLFactory factory;
+ final SSLSocketFactory sf;
+ final HostnameVerifier hv;
+
+ factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
+ factory.init();
+ sf = factory.createSSLSocketFactory();
+ hv = factory.getHostnameVerifier();
+
+ return new ConnectionConfigurator() {
+ @Override
+ public HttpURLConnection configure(HttpURLConnection conn)
+ throws IOException {
+ if (conn instanceof HttpsURLConnection) {
+ HttpsURLConnection c = (HttpsURLConnection) conn;
+ c.setSSLSocketFactory(sf);
+ c.setHostnameVerifier(hv);
+ }
+ URLConnectionFactory.setTimeouts(conn, timeout);
+ return conn;
+ }
+ };
+ }
+
+ /**
+ * Opens a url with read and connect timeouts
+ *
+ * @param url
+ * to open
+ * @return URLConnection
+ * @throws IOException
+ */
+ public URLConnection openConnection(URL url) throws IOException {
+ try {
+ return openConnection(url, false);
+ } catch (AuthenticationException e) {
+ // Unreachable
+ return null;
+ }
+ }
+
+ /**
+ * Opens a url with read and connect timeouts
+ *
+ * @param url
+ * URL to open
+ * @param isSpnego
+ * whether the url should be authenticated via SPNEGO
+ * @return URLConnection
+ * @throws IOException
+ * @throws AuthenticationException
+ */
+ public URLConnection openConnection(URL url, boolean isSpnego)
+ throws IOException, AuthenticationException {
+ if (isSpnego) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("open AuthenticatedURL connection" + url);
+ }
+ UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
+ final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
+ return new AuthenticatedURL(new KerberosUgiAuthenticator(),
+ connConfigurator).openConnection(url, authToken);
+ } else {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("open URL connection");
+ }
+ URLConnection connection = url.openConnection();
+ if (connection instanceof HttpURLConnection) {
+ connConfigurator.configure((HttpURLConnection) connection);
+ }
+ return connection;
+ }
+ }
+
+ /**
+ * Sets timeout parameters on the given URLConnection.
+ *
+ * @param connection
+ * URLConnection to set
+ * @param socketTimeout
+ * the connection and read timeout of the connection.
+ */
+ private static void setTimeouts(URLConnection connection, int socketTimeout) {
+ connection.setConnectTimeout(socketTimeout);
+ connection.setReadTimeout(socketTimeout);
+ }
+}