You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/29 23:44:35 UTC
svn commit: r1536921 [1/2] - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/
src/main/java/org/apache/hadoop/hdfs/server/namenode/
src/main/java/org/apache/hadoop/hdfs/tools/ src/main/java/org/apache/ha...
Author: arp
Date: Tue Oct 29 22:44:34 2013
New Revision: 1536921
URL: http://svn.apache.org/r1536921
Log:
HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web. (Contributed by Haohui Mai)
Added:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpURLTimeouts.java
Removed:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpDelegationToken.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpURLTimeouts.java
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Oct 29 22:44:34 2013
@@ -447,6 +447,9 @@ Release 2.3.0 - UNRELEASED
HDFS-5363. Refactor WebHdfsFileSystem: move SPENGO-authenticated connection
creation to URLConnectionFactory. (Haohui Mai via jing9)
+ HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web
+ (Haohui Mai via Arpit Agarwal)
+
OPTIMIZATIONS
HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn)
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Tue Oct 29 22:44:34 2013
@@ -38,7 +38,7 @@ import org.apache.hadoop.security.UserGr
import org.apache.hadoop.util.ServletUtil;
/** Redirect queries about the hosted filesystem to an appropriate datanode.
- * @see org.apache.hadoop.hdfs.HftpFileSystem
+ * @see org.apache.hadoop.hdfs.web.HftpFileSystem
*/
@InterfaceAudience.Private
public class FileDataServlet extends DfsServlet {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java Tue Oct 29 22:44:34 2013
@@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.na
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.util.ServletUtil;
import org.apache.hadoop.util.VersionInfo;
-
import org.znerd.xmlenc.*;
import java.io.IOException;
@@ -39,13 +38,14 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Stack;
import java.util.regex.Pattern;
+
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
/**
* Obtain meta-information about a filesystem.
- * @see org.apache.hadoop.hdfs.HftpFileSystem
+ * @see org.apache.hadoop.hdfs.web.HftpFileSystem
*/
@InterfaceAudience.Private
public class ListPathsServlet extends DfsServlet {
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Tue Oct 29 22:44:34 2013
@@ -41,12 +41,12 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials;
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HttpHeaders;
+
+/**
+ * To support HTTP byte streams, a new connection to an HTTP server needs to be
+ * created each time. This class hides the complexity of those multiple
+ * connections from the client. Whenever seek() is called, a new connection
+ * is made on the successive read(). The normal input stream functions are
+ * connected to the currently active input stream.
+ */
+public abstract class ByteRangeInputStream extends FSInputStream {
+
+ /**
+ * This class wraps a URL and provides method to open connection.
+ * It can be overridden to change how a connection is opened.
+ */
+ public static abstract class URLOpener {
+ protected URL url;
+
+ public URLOpener(URL u) {
+ url = u;
+ }
+
+ public void setURL(URL u) {
+ url = u;
+ }
+
+ public URL getURL() {
+ return url;
+ }
+
+ /** Connect to server with a data offset. */
+ protected abstract HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException;
+ }
+
+ enum StreamStatus {
+ NORMAL, SEEK, CLOSED
+ }
+ protected InputStream in;
+ protected URLOpener originalURL;
+ protected URLOpener resolvedURL;
+ protected long startPos = 0;
+ protected long currentPos = 0;
+ protected Long fileLength = null;
+
+ StreamStatus status = StreamStatus.SEEK;
+
+ /**
+ * Create with the specified URLOpeners. Original url is used to open the
+ * stream for the first time. Resolved url is used in subsequent requests.
+ * @param o Original url
+ * @param r Resolved url
+ */
+ public ByteRangeInputStream(URLOpener o, URLOpener r) {
+ this.originalURL = o;
+ this.resolvedURL = r;
+ }
+
+ protected abstract URL getResolvedUrl(final HttpURLConnection connection
+ ) throws IOException;
+
+ @VisibleForTesting
+ protected InputStream getInputStream() throws IOException {
+ switch (status) {
+ case NORMAL:
+ break;
+ case SEEK:
+ if (in != null) {
+ in.close();
+ }
+ in = openInputStream();
+ status = StreamStatus.NORMAL;
+ break;
+ case CLOSED:
+ throw new IOException("Stream closed");
+ }
+ return in;
+ }
+
+ @VisibleForTesting
+ protected InputStream openInputStream() throws IOException {
+ // Use the original url if no resolved url exists, eg. if
+ // it's the first time a request is made.
+ final boolean resolved = resolvedURL.getURL() != null;
+ final URLOpener opener = resolved? resolvedURL: originalURL;
+
+ final HttpURLConnection connection = opener.connect(startPos, resolved);
+ resolvedURL.setURL(getResolvedUrl(connection));
+
+ InputStream in = connection.getInputStream();
+ final Map<String, List<String>> headers = connection.getHeaderFields();
+ if (isChunkedTransferEncoding(headers)) {
+ // file length is not known
+ fileLength = null;
+ } else {
+ // for non-chunked transfer-encoding, get content-length
+ final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+ if (cl == null) {
+ throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+ + headers);
+ }
+ final long streamlength = Long.parseLong(cl);
+ fileLength = startPos + streamlength;
+
+ // Java has a bug with >2GB request streams. It won't bounds check
+ // the reads so the transfer blocks until the server times out
+ in = new BoundedInputStream(in, streamlength);
+ }
+
+ return in;
+ }
+
+ private static boolean isChunkedTransferEncoding(
+ final Map<String, List<String>> headers) {
+ return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
+ || contains(headers, HttpHeaders.TE, "chunked");
+ }
+
+ /** Does the HTTP header map contain the given key, value pair? */
+ private static boolean contains(final Map<String, List<String>> headers,
+ final String key, final String value) {
+ final List<String> values = headers.get(key);
+ if (values != null) {
+ for(String v : values) {
+ for(final StringTokenizer t = new StringTokenizer(v, ",");
+ t.hasMoreTokens(); ) {
+ if (value.equalsIgnoreCase(t.nextToken())) {
+ return true;
+ }
+ }
+ }
+ }
+ return false;
+ }
+
+ private int update(final int n) throws IOException {
+ if (n != -1) {
+ currentPos += n;
+ } else if (fileLength != null && currentPos < fileLength) {
+ throw new IOException("Got EOF but currentPos = " + currentPos
+ + " < filelength = " + fileLength);
+ }
+ return n;
+ }
+
+ @Override
+ public int read() throws IOException {
+ final int b = getInputStream().read();
+ update((b == -1) ? -1 : 1);
+ return b;
+ }
+
+ @Override
+ public int read(byte b[], int off, int len) throws IOException {
+ return update(getInputStream().read(b, off, len));
+ }
+
+ /**
+ * Seek to the given offset from the start of the file.
+ * The next read() will be from that location. Can't
+ * seek past the end of the file.
+ */
+ @Override
+ public void seek(long pos) throws IOException {
+ if (pos != currentPos) {
+ startPos = pos;
+ currentPos = pos;
+ if (status != StreamStatus.CLOSED) {
+ status = StreamStatus.SEEK;
+ }
+ }
+ }
+
+ /**
+ * Return the current offset from the start of the file
+ */
+ @Override
+ public long getPos() throws IOException {
+ return currentPos;
+ }
+
+ /**
+ * Seeks a different copy of the data. Returns true if
+ * found a new source, false otherwise.
+ */
+ @Override
+ public boolean seekToNewSource(long targetPos) throws IOException {
+ return false;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (in != null) {
+ in.close();
+ in = null;
+ }
+ status = StreamStatus.CLOSED;
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.DelegationTokenRenewer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ServletUtil;
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+/**
+ * An implementation of a protocol for accessing filesystems over HTTP.
+ * The following implementation provides a limited, read-only interface
+ * to a filesystem over HTTP.
+ * @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
+ * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class HftpFileSystem extends FileSystem
+ implements DelegationTokenRenewer.Renewable {
+ static {
+ HttpURLConnection.setFollowRedirects(true);
+ }
+
+ URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
+
+ public static final Text TOKEN_KIND = new Text("HFTP delegation");
+
+ protected UserGroupInformation ugi;
+ private URI hftpURI;
+
+ protected URI nnUri;
+
+ public static final String HFTP_TIMEZONE = "UTC";
+ public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+
+ private Token<?> delegationToken;
+ private Token<?> renewToken;
+ private static final HftpDelegationTokenSelector hftpTokenSelector =
+ new HftpDelegationTokenSelector();
+
+ private DelegationTokenRenewer dtRenewer = null;
+
+ private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
+ if (dtRenewer == null) {
+ dtRenewer = DelegationTokenRenewer.getInstance();
+ }
+
+ dtRenewer.addRenewAction(hftpFs);
+ }
+
+ public static final SimpleDateFormat getDateFormat() {
+ final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
+ df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
+ return df;
+ }
+
+ protected static final ThreadLocal<SimpleDateFormat> df =
+ new ThreadLocal<SimpleDateFormat>() {
+ @Override
+ protected SimpleDateFormat initialValue() {
+ return getDateFormat();
+ }
+ };
+
+ @Override
+ protected int getDefaultPort() {
+ return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
+ }
+
+ /**
+ * We generate the address with one of the following ports, in
+ * order of preference.
+ * 1. Port from the hftp URI e.g. hftp://namenode:4000/ will return 4000.
+ * 2. Port configured via DFS_NAMENODE_HTTP_PORT_KEY
+ * 3. DFS_NAMENODE_HTTP_PORT_DEFAULT i.e. 50070.
+ *
+ * @param uri
+ * @return
+ */
+ protected InetSocketAddress getNamenodeAddr(URI uri) {
+ // use authority so user supplied uri can override port
+ return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
+ }
+
+ protected URI getNamenodeUri(URI uri) {
+ return DFSUtil.createUri(getUnderlyingProtocol(), getNamenodeAddr(uri));
+ }
+
+ /**
+ * See the documentation of {@Link #getNamenodeAddr(URI)} for the logic
+ * behind selecting the canonical service name.
+ * @return
+ */
+ @Override
+ public String getCanonicalServiceName() {
+ return SecurityUtil.buildTokenService(nnUri).toString();
+ }
+
+ @Override
+ protected URI canonicalizeUri(URI uri) {
+ return NetUtils.getCanonicalUri(uri, getDefaultPort());
+ }
+
+ /**
+ * Return the protocol scheme for the FileSystem.
+ * <p/>
+ *
+ * @return <code>hftp</code>
+ */
+ @Override
+ public String getScheme() {
+ return "hftp";
+ }
+
+ @Override
+ public void initialize(final URI name, final Configuration conf)
+ throws IOException {
+ super.initialize(name, conf);
+ setConf(conf);
+ this.ugi = UserGroupInformation.getCurrentUser();
+ this.nnUri = getNamenodeUri(name);
+ try {
+ this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
+ null, null, null);
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException(e);
+ }
+
+ if (UserGroupInformation.isSecurityEnabled()) {
+ initDelegationToken();
+ }
+ }
+
+ protected void initDelegationToken() throws IOException {
+ // look for hftp token, then try hdfs
+ Token<?> token = selectDelegationToken(ugi);
+
+ // if we don't already have a token, go get one over https
+ boolean createdToken = false;
+ if (token == null) {
+ token = getDelegationToken(null);
+ createdToken = (token != null);
+ }
+
+ // we already had a token or getDelegationToken() didn't fail.
+ if (token != null) {
+ setDelegationToken(token);
+ if (createdToken) {
+ addRenewAction(this);
+ LOG.debug("Created new DT for " + token.getService());
+ } else {
+ LOG.debug("Found existing DT for " + token.getService());
+ }
+ }
+ }
+
+ protected Token<DelegationTokenIdentifier> selectDelegationToken(
+ UserGroupInformation ugi) {
+ return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
+ }
+
+
+ @Override
+ public Token<?> getRenewToken() {
+ return renewToken;
+ }
+
+ /**
+ * Return the underlying protocol that is used to talk to the namenode.
+ */
+ protected String getUnderlyingProtocol() {
+ return "http";
+ }
+
+ @Override
+ public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+ renewToken = token;
+ // emulate the 203 usage of the tokens
+ // by setting the kind and service as if they were hdfs tokens
+ delegationToken = new Token<T>(token);
+ // NOTE: the remote nn must be configured to use hdfs
+ delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+ // no need to change service because we aren't exactly sure what it
+ // should be. we can guess, but it might be wrong if the local conf
+ // value is incorrect. the service is a client side field, so the remote
+ // end does not care about the value
+ }
+
+ @Override
+ public synchronized Token<?> getDelegationToken(final String renewer
+ ) throws IOException {
+ try {
+ //Renew TGT if needed
+ ugi.checkTGTAndReloginFromKeytab();
+ return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+ @Override
+ public Token<?> run() throws IOException {
+ final String nnHttpUrl = nnUri.toString();
+ Credentials c;
+ try {
+ c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
+ } catch (IOException e) {
+ if (e.getCause() instanceof ConnectException) {
+ LOG.warn("Couldn't connect to " + nnHttpUrl +
+ ", assuming security is disabled");
+ return null;
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Exception getting delegation token", e);
+ }
+ throw e;
+ }
+ for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Got dt for " + getUri() + ";t.service="
+ +t.getService());
+ }
+ return t;
+ }
+ return null;
+ }
+ });
+ } catch (InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public URI getUri() {
+ return hftpURI;
+ }
+
+ /**
+ * Return a URL pointing to given path on the namenode.
+ *
+ * @param path to obtain the URL for
+ * @param query string to append to the path
+ * @return namenode URL referring to the given path
+ * @throws IOException on error constructing the URL
+ */
+ protected URL getNamenodeURL(String path, String query) throws IOException {
+ final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
+ nnUri.getPort(), path + '?' + query);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("url=" + url);
+ }
+ return url;
+ }
+
+ /**
+ * Get encoded UGI parameter string for a URL.
+ *
+ * @return user_shortname,group1,group2...
+ */
+ private String getEncodedUgiParameter() {
+ StringBuilder ugiParamenter = new StringBuilder(
+ ServletUtil.encodeQueryValue(ugi.getShortUserName()));
+ for(String g: ugi.getGroupNames()) {
+ ugiParamenter.append(",");
+ ugiParamenter.append(ServletUtil.encodeQueryValue(g));
+ }
+ return ugiParamenter.toString();
+ }
+
+ /**
+ * Open an HTTP connection to the namenode to read file data and metadata.
+ * @param path The path component of the URL
+ * @param query The query component of the URL
+ */
+ protected HttpURLConnection openConnection(String path, String query)
+ throws IOException {
+ query = addDelegationTokenParam(query);
+ final URL url = getNamenodeURL(path, query);
+ final HttpURLConnection connection;
+ connection = (HttpURLConnection)connectionFactory.openConnection(url);
+ connection.setRequestMethod("GET");
+ connection.connect();
+ return connection;
+ }
+
+ protected String addDelegationTokenParam(String query) throws IOException {
+ String tokenString = null;
+ if (UserGroupInformation.isSecurityEnabled()) {
+ synchronized (this) {
+ if (delegationToken != null) {
+ tokenString = delegationToken.encodeToUrlString();
+ return (query + JspHelper.getDelegationTokenUrlParam(tokenString));
+ }
+ }
+ }
+ return query;
+ }
+
+ static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
+ URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
+
+ RangeHeaderUrlOpener(final URL url) {
+ super(url);
+ }
+
+ protected HttpURLConnection openConnection() throws IOException {
+ return (HttpURLConnection)connectionFactory.openConnection(url);
+ }
+
+ /** Use HTTP Range header for specifying offset. */
+ @Override
+ protected HttpURLConnection connect(final long offset,
+ final boolean resolved) throws IOException {
+ final HttpURLConnection conn = openConnection();
+ conn.setRequestMethod("GET");
+ if (offset != 0L) {
+ conn.setRequestProperty("Range", "bytes=" + offset + "-");
+ }
+ conn.connect();
+
+ //Expects HTTP_OK or HTTP_PARTIAL response codes.
+ final int code = conn.getResponseCode();
+ if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
+ throw new IOException("HTTP_PARTIAL expected, received " + code);
+ } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
+ throw new IOException("HTTP_OK expected, received " + code);
+ }
+ return conn;
+ }
+ }
+
+ static class RangeHeaderInputStream extends ByteRangeInputStream {
+ RangeHeaderInputStream(RangeHeaderUrlOpener o, RangeHeaderUrlOpener r) {
+ super(o, r);
+ }
+
+ RangeHeaderInputStream(final URL url) {
+ this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
+ }
+
+ @Override
+ protected URL getResolvedUrl(final HttpURLConnection connection) {
+ return connection.getURL();
+ }
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int buffersize) throws IOException {
+ f = f.makeQualified(getUri(), getWorkingDirectory());
+ String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
+ String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
+ URL u = getNamenodeURL(path, query);
+ return new FSDataInputStream(new RangeHeaderInputStream(u));
+ }
+
+ @Override
+ public void close() throws IOException {
+ super.close();
+ if (dtRenewer != null) {
+ dtRenewer.removeRenewAction(this); // blocks
+ }
+ }
+
+ /** Class to parse and store a listing reply from the server. */
+ class LsParser extends DefaultHandler {
+
+ ArrayList<FileStatus> fslist = new ArrayList<FileStatus>();
+
+ @Override
+ public void startElement(String ns, String localname, String qname,
+ Attributes attrs) throws SAXException {
+ if ("listing".equals(qname)) return;
+ if (!"file".equals(qname) && !"directory".equals(qname)) {
+ if (RemoteException.class.getSimpleName().equals(qname)) {
+ throw new SAXException(RemoteException.valueOf(attrs));
+ }
+ throw new SAXException("Unrecognized entry: " + qname);
+ }
+ long modif;
+ long atime = 0;
+ try {
+ final SimpleDateFormat ldf = df.get();
+ modif = ldf.parse(attrs.getValue("modified")).getTime();
+ String astr = attrs.getValue("accesstime");
+ if (astr != null) {
+ atime = ldf.parse(astr).getTime();
+ }
+ } catch (ParseException e) { throw new SAXException(e); }
+ FileStatus fs = "file".equals(qname)
+ ? new FileStatus(
+ Long.valueOf(attrs.getValue("size")).longValue(), false,
+ Short.valueOf(attrs.getValue("replication")).shortValue(),
+ Long.valueOf(attrs.getValue("blocksize")).longValue(),
+ modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
+ attrs.getValue("owner"), attrs.getValue("group"),
+ HftpFileSystem.this.makeQualified(
+ new Path(getUri().toString(), attrs.getValue("path"))))
+ : new FileStatus(0L, true, 0, 0L,
+ modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
+ attrs.getValue("owner"), attrs.getValue("group"),
+ HftpFileSystem.this.makeQualified(
+ new Path(getUri().toString(), attrs.getValue("path"))));
+ fslist.add(fs);
+ }
+
+ private void fetchList(String path, boolean recur) throws IOException {
+ try {
+ XMLReader xr = XMLReaderFactory.createXMLReader();
+ xr.setContentHandler(this);
+ HttpURLConnection connection = openConnection(
+ "/listPaths" + ServletUtil.encodePath(path),
+ "ugi=" + getEncodedUgiParameter() + (recur ? "&recursive=yes" : ""));
+ InputStream resp = connection.getInputStream();
+ xr.parse(new InputSource(resp));
+ } catch(SAXException e) {
+ final Exception embedded = e.getException();
+ if (embedded != null && embedded instanceof IOException) {
+ throw (IOException)embedded;
+ }
+ throw new IOException("invalid xml directory content", e);
+ }
+ }
+
+ public FileStatus getFileStatus(Path f) throws IOException {
+ fetchList(f.toUri().getPath(), false);
+ if (fslist.size() == 0) {
+ throw new FileNotFoundException("File does not exist: " + f);
+ }
+ return fslist.get(0);
+ }
+
+ public FileStatus[] listStatus(Path f, boolean recur) throws IOException {
+ fetchList(f.toUri().getPath(), recur);
+ if (fslist.size() > 0 && (fslist.size() != 1 || fslist.get(0).isDirectory())) {
+ fslist.remove(0);
+ }
+ return fslist.toArray(new FileStatus[0]);
+ }
+
+ public FileStatus[] listStatus(Path f) throws IOException {
+ return listStatus(f, false);
+ }
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws IOException {
+ LsParser lsparser = new LsParser();
+ return lsparser.listStatus(f);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ LsParser lsparser = new LsParser();
+ return lsparser.getFileStatus(f);
+ }
+
+ private class ChecksumParser extends DefaultHandler {
+ private FileChecksum filechecksum;
+
+ @Override
+ public void startElement(String ns, String localname, String qname,
+ Attributes attrs) throws SAXException {
+ if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
+ if (RemoteException.class.getSimpleName().equals(qname)) {
+ throw new SAXException(RemoteException.valueOf(attrs));
+ }
+ throw new SAXException("Unrecognized entry: " + qname);
+ }
+
+ filechecksum = MD5MD5CRC32FileChecksum.valueOf(attrs);
+ }
+
+ private FileChecksum getFileChecksum(String f) throws IOException {
+ final HttpURLConnection connection = openConnection(
+ "/fileChecksum" + ServletUtil.encodePath(f),
+ "ugi=" + getEncodedUgiParameter());
+ try {
+ final XMLReader xr = XMLReaderFactory.createXMLReader();
+ xr.setContentHandler(this);
+ xr.parse(new InputSource(connection.getInputStream()));
+ } catch(SAXException e) {
+ final Exception embedded = e.getException();
+ if (embedded != null && embedded instanceof IOException) {
+ throw (IOException)embedded;
+ }
+ throw new IOException("invalid xml directory content", e);
+ } finally {
+ connection.disconnect();
+ }
+ return filechecksum;
+ }
+ }
+
+ @Override
+ public FileChecksum getFileChecksum(Path f) throws IOException {
+ final String s = makeQualified(f).toUri().getPath();
+ return new ChecksumParser().getFileChecksum(s);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return new Path("/").makeQualified(getUri(), null);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path f) { }
+
+ /** This optional operation is not yet supported. */
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize,
+ Progressable progress) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission,
+ boolean overwrite, int bufferSize, short replication,
+ long blockSize, Progressable progress) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ throw new IOException("Not supported");
+ }
+
+ /**
+ * A parser for parsing {@link ContentSummary} xml.
+ */
+ private class ContentSummaryParser extends DefaultHandler {
+ private ContentSummary contentsummary;
+
+ @Override
+ public void startElement(String ns, String localname, String qname,
+ Attributes attrs) throws SAXException {
+ if (!ContentSummary.class.getName().equals(qname)) {
+ if (RemoteException.class.getSimpleName().equals(qname)) {
+ throw new SAXException(RemoteException.valueOf(attrs));
+ }
+ throw new SAXException("Unrecognized entry: " + qname);
+ }
+
+ contentsummary = toContentSummary(attrs);
+ }
+
+ /**
+ * Connect to the name node and get content summary.
+ * @param path The path
+ * @return The content summary for the path.
+ * @throws IOException
+ */
+ private ContentSummary getContentSummary(String path) throws IOException {
+ final HttpURLConnection connection = openConnection(
+ "/contentSummary" + ServletUtil.encodePath(path),
+ "ugi=" + getEncodedUgiParameter());
+ InputStream in = null;
+ try {
+ in = connection.getInputStream();
+
+ final XMLReader xr = XMLReaderFactory.createXMLReader();
+ xr.setContentHandler(this);
+ xr.parse(new InputSource(in));
+ } catch(FileNotFoundException fnfe) {
+ //the server may not support getContentSummary
+ return null;
+ } catch(SAXException saxe) {
+ final Exception embedded = saxe.getException();
+ if (embedded != null && embedded instanceof IOException) {
+ throw (IOException)embedded;
+ }
+ throw new IOException("Invalid xml format", saxe);
+ } finally {
+ if (in != null) {
+ in.close();
+ }
+ connection.disconnect();
+ }
+ return contentsummary;
+ }
+ }
+
+ /** Return the object represented in the attributes. */
+ private static ContentSummary toContentSummary(Attributes attrs
+ ) throws SAXException {
+ final String length = attrs.getValue("length");
+ final String fileCount = attrs.getValue("fileCount");
+ final String directoryCount = attrs.getValue("directoryCount");
+ final String quota = attrs.getValue("quota");
+ final String spaceConsumed = attrs.getValue("spaceConsumed");
+ final String spaceQuota = attrs.getValue("spaceQuota");
+
+ if (length == null
+ || fileCount == null
+ || directoryCount == null
+ || quota == null
+ || spaceConsumed == null
+ || spaceQuota == null) {
+ return null;
+ }
+
+ try {
+ return new ContentSummary(
+ Long.parseLong(length),
+ Long.parseLong(fileCount),
+ Long.parseLong(directoryCount),
+ Long.parseLong(quota),
+ Long.parseLong(spaceConsumed),
+ Long.parseLong(spaceQuota));
+ } catch(Exception e) {
+ throw new SAXException("Invalid attributes: length=" + length
+ + ", fileCount=" + fileCount
+ + ", directoryCount=" + directoryCount
+ + ", quota=" + quota
+ + ", spaceConsumed=" + spaceConsumed
+ + ", spaceQuota=" + spaceQuota, e);
+ }
+ }
+
+ @Override
+ public ContentSummary getContentSummary(Path f) throws IOException {
+ final String s = makeQualified(f).toUri().getPath();
+ final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
+ return cs != null? cs: super.getContentSummary(f);
+ }
+
+ @InterfaceAudience.Private
+ public static class TokenManager extends TokenRenewer {
+
+ @Override
+ public boolean handleKind(Text kind) {
+ return kind.equals(TOKEN_KIND);
+ }
+
+ @Override
+ public boolean isManaged(Token<?> token) throws IOException {
+ return true;
+ }
+
+ protected String getUnderlyingProtocol() {
+ return "http";
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public long renew(Token<?> token,
+ Configuration conf) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+ return
+ DelegationTokenFetcher.renewDelegationToken
+ (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
+ (Token<DelegationTokenIdentifier>) token);
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public void cancel(Token<?> token,
+ Configuration conf) throws IOException {
+ // update the kerberos credentials, if they are coming from a keytab
+ UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+ InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+ DelegationTokenFetcher.cancelDelegationToken
+ (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
+ (Token<DelegationTokenIdentifier>) token);
+ }
+ }
+
+ private static class HftpDelegationTokenSelector
+ extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
+ private static final DelegationTokenSelector hdfsTokenSelector =
+ new DelegationTokenSelector();
+
+ public HftpDelegationTokenSelector() {
+ super(TOKEN_KIND);
+ }
+
+ Token<DelegationTokenIdentifier> selectToken(URI nnUri,
+ Collection<Token<?>> tokens, Configuration conf) {
+ Token<DelegationTokenIdentifier> token =
+ selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
+ if (token == null) {
+ // try to get a HDFS token
+ token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
+ }
+ return token;
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.util.Time;
+
+/**
+ * An implementation of a protocol for accessing filesystems over HTTPS. The
+ * following implementation provides a limited, read-only interface to a
+ * filesystem over HTTPS.
+ *
+ * @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
+ * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class HsftpFileSystem extends HftpFileSystem {
+
+ private static final long MM_SECONDS_PER_DAY = 1000 * 60 * 60 * 24;
+ private volatile int ExpWarnDays = 0;
+
+ /**
+ * Return the protocol scheme for the FileSystem.
+ * <p/>
+ *
+ * @return <code>hsftp</code>
+ */
+ @Override
+ public String getScheme() {
+ return "hsftp";
+ }
+
+ /**
+ * Return the underlying protocol that is used to talk to the namenode.
+ */
+ @Override
+ protected String getUnderlyingProtocol() {
+ return "https";
+ }
+
+ @Override
+ public void initialize(URI name, Configuration conf) throws IOException {
+ super.initialize(name, conf);
+ setupSsl(conf);
+ ExpWarnDays = conf.getInt("ssl.expiration.warn.days", 30);
+ }
+
+ /**
+ * Set up SSL resources
+ *
+ * @throws IOException
+ */
+ private static void setupSsl(Configuration conf) throws IOException {
+ Configuration sslConf = new HdfsConfiguration(false);
+ sslConf.addResource(conf.get(DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
+ DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+ FileInputStream fis = null;
+ try {
+ SSLContext sc = SSLContext.getInstance("SSL");
+ KeyManager[] kms = null;
+ TrustManager[] tms = null;
+ if (sslConf.get("ssl.client.keystore.location") != null) {
+ // initialize default key manager with keystore file and pass
+ KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+ KeyStore ks = KeyStore.getInstance(sslConf.get(
+ "ssl.client.keystore.type", "JKS"));
+ char[] ksPass = sslConf.get("ssl.client.keystore.password", "changeit")
+ .toCharArray();
+ fis = new FileInputStream(sslConf.get("ssl.client.keystore.location",
+ "keystore.jks"));
+ ks.load(fis, ksPass);
+ kmf.init(ks, sslConf.get("ssl.client.keystore.keypassword", "changeit")
+ .toCharArray());
+ kms = kmf.getKeyManagers();
+ fis.close();
+ fis = null;
+ }
+ // initialize default trust manager with truststore file and pass
+ if (sslConf.getBoolean("ssl.client.do.not.authenticate.server", false)) {
+ // by pass trustmanager validation
+ tms = new DummyTrustManager[] { new DummyTrustManager() };
+ } else {
+ TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+ KeyStore ts = KeyStore.getInstance(sslConf.get(
+ "ssl.client.truststore.type", "JKS"));
+ char[] tsPass = sslConf.get("ssl.client.truststore.password",
+ "changeit").toCharArray();
+ fis = new FileInputStream(sslConf.get("ssl.client.truststore.location",
+ "truststore.jks"));
+ ts.load(fis, tsPass);
+ tmf.init(ts);
+ tms = tmf.getTrustManagers();
+ }
+ sc.init(kms, tms, new java.security.SecureRandom());
+ HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+ } catch (Exception e) {
+ throw new IOException("Could not initialize SSLContext", e);
+ } finally {
+ if (fis != null) {
+ fis.close();
+ }
+ }
+ }
+
+ @Override
+ protected int getDefaultPort() {
+ return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
+ DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
+ }
+
+ @Override
+ protected HttpURLConnection openConnection(String path, String query)
+ throws IOException {
+ query = addDelegationTokenParam(query);
+ final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
+ nnUri.getPort(), path + '?' + query);
+ HttpsURLConnection conn;
+ conn = (HttpsURLConnection)connectionFactory.openConnection(url);
+ // bypass hostname verification
+ conn.setHostnameVerifier(new DummyHostnameVerifier());
+ conn.setRequestMethod("GET");
+ conn.connect();
+
+ // check cert expiration date
+ final int warnDays = ExpWarnDays;
+ if (warnDays > 0) { // make sure only check once
+ ExpWarnDays = 0;
+ long expTimeThreshold = warnDays * MM_SECONDS_PER_DAY + Time.now();
+ X509Certificate[] clientCerts = (X509Certificate[]) conn
+ .getLocalCertificates();
+ if (clientCerts != null) {
+ for (X509Certificate cert : clientCerts) {
+ long expTime = cert.getNotAfter().getTime();
+ if (expTime < expTimeThreshold) {
+ StringBuilder sb = new StringBuilder();
+ sb.append("\n Client certificate "
+ + cert.getSubjectX500Principal().getName());
+ int dayOffSet = (int) ((expTime - Time.now()) / MM_SECONDS_PER_DAY);
+ sb.append(" have " + dayOffSet + " days to expire");
+ LOG.warn(sb.toString());
+ }
+ }
+ }
+ }
+ return (HttpURLConnection) conn;
+ }
+
+ /**
+ * Dummy hostname verifier that is used to bypass hostname checking
+ */
+ protected static class DummyHostnameVerifier implements HostnameVerifier {
+ @Override
+ public boolean verify(String hostname, SSLSession session) {
+ return true;
+ }
+ }
+
+ /**
+ * Dummy trustmanager that is used to trust all server certificates
+ */
+ protected static class DummyTrustManager implements X509TrustManager {
+ @Override
+ public void checkClientTrusted(X509Certificate[] chain, String authType) {
+ }
+
+ @Override
+ public void checkServerTrusted(X509Certificate[] chain, String authType) {
+ }
+
+ @Override
+ public X509Certificate[] getAcceptedIssuers() {
+ return null;
+ }
+ }
+
+}
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Tue Oct 29 22:44:34 2013
@@ -51,7 +51,6 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.ByteRangeInputStream;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem Tue Oct 29 22:44:34 2013
@@ -14,6 +14,6 @@
# limitations under the License.
org.apache.hadoop.hdfs.DistributedFileSystem
-org.apache.hadoop.hdfs.HftpFileSystem
-org.apache.hadoop.hdfs.HsftpFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
org.apache.hadoop.hdfs.web.WebHdfsFileSystem
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Tue Oct 29 22:44:34 2013
@@ -13,5 +13,5 @@
#
org.apache.hadoop.hdfs.DFSClient$Renewer
org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
-org.apache.hadoop.hdfs.HftpFileSystem$TokenManager
+org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager
org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Oct 29 22:44:34 2013
@@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.server.nam
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Tue Oct 29 22:44:34 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.hadoop.fs.VolumeId;
import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.DataChecksum;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java Tue Oct 29 22:44:34 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Level;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java Tue Oct 29 22:44:34 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.security.UserGroupInformation;
import org.junit.AfterClass;
import org.junit.Assert;
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java Tue Oct 29 22:44:34 2013
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.permission.F
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.security.AccessControlException;
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.hadoop.hdfs.server.namenode.StreamFile;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.junit.Test;
+
+public class TestByteRangeInputStream {
+public static class MockHttpURLConnection extends HttpURLConnection {
+ public MockHttpURLConnection(URL u) {
+ super(u);
+ }
+
+ @Override
+ public boolean usingProxy(){
+ return false;
+ }
+
+ @Override
+ public void disconnect() {
+ }
+
+ @Override
+ public void connect() {
+ }
+
+ @Override
+ public InputStream getInputStream() throws IOException {
+ return new ByteArrayInputStream("asdf".getBytes());
+ }
+
+ @Override
+ public URL getURL() {
+ URL u = null;
+ try {
+ u = new URL("http://resolvedurl/");
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
+ return u;
+ }
+
+ @Override
+ public int getResponseCode() {
+ if (responseCode != -1) {
+ return responseCode;
+ } else {
+ if (getRequestProperty("Range") == null) {
+ return 200;
+ } else {
+ return 206;
+ }
+ }
+ }
+
+ public void setResponseCode(int resCode) {
+ responseCode = resCode;
+ }
+
+ @Override
+ public String getHeaderField(String field) {
+ return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
+ }
+}
+
+ @Test
+ public void testByteRange() throws IOException {
+ HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
+ new HftpFileSystem.RangeHeaderUrlOpener(new URL("http://test/")));
+ doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
+ .openConnection();
+ HftpFileSystem.RangeHeaderUrlOpener rspy = spy(
+ new HftpFileSystem.RangeHeaderUrlOpener((URL) null));
+ doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
+ .openConnection();
+ ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy);
+
+ assertEquals("getPos wrong", 0, is.getPos());
+
+ is.read();
+
+ assertNull("Initial call made incorrectly (Range Check)", ospy
+ .openConnection().getRequestProperty("Range"));
+
+ assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
+
+ is.read();
+
+ assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
+
+ // No additional connections should have been made (no seek)
+
+ rspy.setURL(new URL("http://resolvedurl/"));
+
+ is.seek(100);
+ is.read();
+
+ assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
+ "bytes=100-", rspy.openConnection().getRequestProperty("Range"));
+
+ assertEquals("getPos should be 101 after reading one byte", 101,
+ is.getPos());
+
+ verify(rspy, times(2)).openConnection();
+
+ is.seek(101);
+ is.read();
+
+ verify(rspy, times(2)).openConnection();
+
+ // Seek to 101 should not result in another request"
+
+ is.seek(2500);
+ is.read();
+
+ assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
+ "bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
+
+ ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
+ is.seek(500);
+
+ try {
+ is.read();
+ fail("Exception should be thrown when 200 response is given "
+ + "but 206 is expected");
+ } catch (IOException e) {
+ assertEquals("Should fail because incorrect response code was sent",
+ "HTTP_PARTIAL expected, received 200", e.getMessage());
+ }
+
+ ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
+ is.seek(0);
+
+ try {
+ is.read();
+ fail("Exception should be thrown when 206 response is given "
+ + "but 200 is expected");
+ } catch (IOException e) {
+ assertEquals("Should fail because incorrect response code was sent",
+ "HTTP_OK expected, received 206", e.getMessage());
+ }
+ }
+
+ @Test
+ public void testPropagatedClose() throws IOException {
+ ByteRangeInputStream brs = spy(
+ new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
+
+ InputStream mockStream = mock(InputStream.class);
+ doReturn(mockStream).when(brs).openInputStream();
+
+ int brisOpens = 0;
+ int brisCloses = 0;
+ int isCloses = 0;
+
+ // first open, shouldn't close underlying stream
+ brs.getInputStream();
+ verify(brs, times(++brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // stream is open, shouldn't close underlying stream
+ brs.getInputStream();
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // seek forces a reopen, should close underlying stream
+ brs.seek(1);
+ brs.getInputStream();
+ verify(brs, times(++brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(++isCloses)).close();
+
+ // verify that the underlying stream isn't closed after a seek
+ // ie. the state was correctly updated
+ brs.getInputStream();
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // seeking to same location should be a no-op
+ brs.seek(1);
+ brs.getInputStream();
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // close should of course close
+ brs.close();
+ verify(brs, times(++brisCloses)).close();
+ verify(mockStream, times(++isCloses)).close();
+
+ // it's already closed, underlying stream should not close
+ brs.close();
+ verify(brs, times(++brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+
+ // it's closed, don't reopen it
+ boolean errored = false;
+ try {
+ brs.getInputStream();
+ } catch (IOException e) {
+ errored = true;
+ assertEquals("Stream closed", e.getMessage());
+ } finally {
+ assertTrue("Read a closed steam", errored);
+ }
+ verify(brs, times(brisOpens)).openInputStream();
+ verify(brs, times(brisCloses)).close();
+ verify(mockStream, times(isCloses)).close();
+ }
+}
Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.apache.hadoop.hdfs.web.HsftpFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtilTestHelper;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.Test;
+
+public class TestHftpDelegationToken {
+
+ @Test
+ public void testHdfsDelegationToken() throws Exception {
+ SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+ final Configuration conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ UserGroupInformation user =
+ UserGroupInformation.createUserForTesting("oom",
+ new String[]{"memory"});
+ Token<?> token = new Token<TokenIdentifier>
+ (new byte[0], new byte[0],
+ DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
+ new Text("127.0.0.1:8020"));
+ user.addToken(token);
+ Token<?> token2 = new Token<TokenIdentifier>
+ (null, null, new Text("other token"), new Text("127.0.0.1:8021"));
+ user.addToken(token2);
+ assertEquals("wrong tokens in user", 2, user.getTokens().size());
+ FileSystem fs =
+ user.doAs(new PrivilegedExceptionAction<FileSystem>() {
+ @Override
+ public FileSystem run() throws Exception {
+ return FileSystem.get(new URI("hftp://localhost:50470/"), conf);
+ }
+ });
+ assertSame("wrong kind of file system", HftpFileSystem.class,
+ fs.getClass());
+ Field renewToken = HftpFileSystem.class.getDeclaredField("renewToken");
+ renewToken.setAccessible(true);
+ assertSame("wrong token", token, renewToken.get(fs));
+ }
+
+ @Test
+ public void testSelectHftpDelegationToken() throws Exception {
+ SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+ Configuration conf = new Configuration();
+ conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class);
+
+ int httpPort = 80;
+ int httpsPort = 443;
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
+
+ // test with implicit default port
+ URI fsUri = URI.create("hftp://localhost");
+ MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ assertEquals(httpPort, fs.getCanonicalUri().getPort());
+ checkTokenSelection(fs, httpPort, conf);
+
+ // test with explicit default port
+ // Make sure it uses the port from the hftp URI.
+ fsUri = URI.create("hftp://localhost:"+httpPort);
+ fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ assertEquals(httpPort, fs.getCanonicalUri().getPort());
+ checkTokenSelection(fs, httpPort, conf);
+
+ // test with non-default port
+ // Make sure it uses the port from the hftp URI.
+ fsUri = URI.create("hftp://localhost:"+(httpPort+1));
+ fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
+ checkTokenSelection(fs, httpPort + 1, conf);
+
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
+ }
+
+ @Test
+ public void testSelectHsftpDelegationToken() throws Exception {
+ SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+ Configuration conf = new Configuration();
+ conf.setClass("fs.hsftp.impl", MyHsftpFileSystem.class, FileSystem.class);
+
+ int httpPort = 80;
+ int httpsPort = 443;
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
+
+ // test with implicit default port
+ URI fsUri = URI.create("hsftp://localhost");
+ MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ assertEquals(httpsPort, fs.getCanonicalUri().getPort());
+ checkTokenSelection(fs, httpsPort, conf);
+
+ // test with explicit default port
+ fsUri = URI.create("hsftp://localhost:"+httpsPort);
+ fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ assertEquals(httpsPort, fs.getCanonicalUri().getPort());
+ checkTokenSelection(fs, httpsPort, conf);
+
+ // test with non-default port
+ fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
+ fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+ assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
+ checkTokenSelection(fs, httpsPort+1, conf);
+
+ conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
+ }
+
+
+ @Test
+ public void testInsecureRemoteCluster() throws Exception {
+ final ServerSocket socket = new ServerSocket(0); // just reserve a port
+ socket.close();
+ Configuration conf = new Configuration();
+ URI fsUri = URI.create("hsftp://localhost:"+socket.getLocalPort());
+ assertNull(FileSystem.newInstance(fsUri, conf).getDelegationToken(null));
+ }
+
+ @Test
+ public void testSecureClusterError() throws Exception {
+ final ServerSocket socket = new ServerSocket(0);
+ Thread t = new Thread() {
+ @Override
+ public void run() {
+ while (true) { // fetching does a few retries
+ try {
+ Socket s = socket.accept();
+ s.getOutputStream().write(1234);
+ s.shutdownOutput();
+ } catch (Exception e) {
+ break;
+ }
+ }
+ }
+ };
+ t.start();
+
+ try {
+ Configuration conf = new Configuration();
+ URI fsUri = URI.create("hsftp://localhost:"+socket.getLocalPort());
+ Exception ex = null;
+ try {
+ FileSystem.newInstance(fsUri, conf).getDelegationToken(null);
+ } catch (Exception e) {
+ ex = e;
+ }
+ assertNotNull(ex);
+ assertNotNull(ex.getCause());
+ assertEquals("Remote host closed connection during handshake",
+ ex.getCause().getMessage());
+ } finally {
+ t.interrupt();
+ }
+ }
+
+ private void checkTokenSelection(HftpFileSystem fs,
+ int port,
+ Configuration conf) throws IOException {
+ UserGroupInformation ugi =
+ UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{});
+
+ // use ip-based tokens
+ SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+ // test fallback to hdfs token
+ Token<?> hdfsToken = new Token<TokenIdentifier>(
+ new byte[0], new byte[0],
+ DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
+ new Text("127.0.0.1:8020"));
+ ugi.addToken(hdfsToken);
+
+ // test fallback to hdfs token
+ Token<?> token = fs.selectDelegationToken(ugi);
+ assertNotNull(token);
+ assertEquals(hdfsToken, token);
+
+ // test hftp is favored over hdfs
+ Token<?> hftpToken = new Token<TokenIdentifier>(
+ new byte[0], new byte[0],
+ HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port));
+ ugi.addToken(hftpToken);
+ token = fs.selectDelegationToken(ugi);
+ assertNotNull(token);
+ assertEquals(hftpToken, token);
+
+ // switch to using host-based tokens, no token should match
+ SecurityUtilTestHelper.setTokenServiceUseIp(false);
+ token = fs.selectDelegationToken(ugi);
+ assertNull(token);
+
+ // test fallback to hdfs token
+ hdfsToken = new Token<TokenIdentifier>(
+ new byte[0], new byte[0],
+ DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
+ new Text("localhost:8020"));
+ ugi.addToken(hdfsToken);
+ token = fs.selectDelegationToken(ugi);
+ assertNotNull(token);
+ assertEquals(hdfsToken, token);
+
+ // test hftp is favored over hdfs
+ hftpToken = new Token<TokenIdentifier>(
+ new byte[0], new byte[0],
+ HftpFileSystem.TOKEN_KIND, new Text("localhost:"+port));
+ ugi.addToken(hftpToken);
+ token = fs.selectDelegationToken(ugi);
+ assertNotNull(token);
+ assertEquals(hftpToken, token);
+ }
+
+ static class MyHftpFileSystem extends HftpFileSystem {
+ @Override
+ public URI getCanonicalUri() {
+ return super.getCanonicalUri();
+ }
+ @Override
+ public int getDefaultPort() {
+ return super.getDefaultPort();
+ }
+ // don't automatically get a token
+ @Override
+ protected void initDelegationToken() throws IOException {}
+ }
+
+ static class MyHsftpFileSystem extends HsftpFileSystem {
+ @Override
+ public URI getCanonicalUri() {
+ return super.getCanonicalUri();
+ }
+ @Override
+ public int getDefaultPort() {
+ return super.getDefaultPort();
+ }
+ // don't automatically get a token
+ @Override
+ protected void initDelegationToken() throws IOException {}
+ }
+}