You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ar...@apache.org on 2013/10/29 23:44:35 UTC

svn commit: r1536921 [1/2] - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/ src/main/java/org/apache/hadoop/hdfs/server/namenode/ src/main/java/org/apache/hadoop/hdfs/tools/ src/main/java/org/apache/ha...

Author: arp
Date: Tue Oct 29 22:44:34 2013
New Revision: 1536921

URL: http://svn.apache.org/r1536921
Log:
HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web. (Contributed by Haohui Mai)

Added:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpURLTimeouts.java
Removed:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HftpFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/HsftpFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestByteRangeInputStream.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpDelegationToken.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestHftpURLTimeouts.java
Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestDelegationTokenRemoteFetcher.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Tue Oct 29 22:44:34 2013
@@ -447,6 +447,9 @@ Release 2.3.0 - UNRELEASED
     HDFS-5363. Refactor WebHdfsFileSystem: move SPENGO-authenticated connection
     creation to URLConnectionFactory. (Haohui Mai via jing9)
 
+    HDFS-5436. Move HsFtpFileSystem and HFtpFileSystem into org.apache.hdfs.web
+    (Haohui Mai via Arpit Agarwal)
+
   OPTIMIZATIONS
 
     HDFS-5239.  Allow FSNamesystem lock fairness to be configurable (daryn)

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileDataServlet.java Tue Oct 29 22:44:34 2013
@@ -38,7 +38,7 @@ import org.apache.hadoop.security.UserGr
 import org.apache.hadoop.util.ServletUtil;
 
 /** Redirect queries about the hosted filesystem to an appropriate datanode.
- * @see org.apache.hadoop.hdfs.HftpFileSystem
+ * @see org.apache.hadoop.hdfs.web.HftpFileSystem
  */
 @InterfaceAudience.Private
 public class FileDataServlet extends DfsServlet {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ListPathsServlet.java Tue Oct 29 22:44:34 2013
@@ -20,14 +20,13 @@ package org.apache.hadoop.hdfs.server.na
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.protocol.ClientProtocol;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.protocol.DirectoryListing;
 import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.util.ServletUtil;
 import org.apache.hadoop.util.VersionInfo;
-
 import org.znerd.xmlenc.*;
 
 import java.io.IOException;
@@ -39,13 +38,14 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Stack;
 import java.util.regex.Pattern;
+
 import javax.servlet.ServletException;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
 /**
  * Obtain meta-information about a filesystem.
- * @see org.apache.hadoop.hdfs.HftpFileSystem
+ * @see org.apache.hadoop.hdfs.web.HftpFileSystem
  */
 @InterfaceAudience.Private
 public class ListPathsServlet extends DfsServlet {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Tue Oct 29 22:44:34 2013
@@ -41,12 +41,12 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/ByteRangeInputStream.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,231 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.List;
+import java.util.Map;
+import java.util.StringTokenizer;
+
+import org.apache.commons.io.input.BoundedInputStream;
+import org.apache.hadoop.fs.FSInputStream;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HttpHeaders;
+
+/**
+ * To support HTTP byte streams, a new connection to an HTTP server needs to be
+ * created each time. This class hides the complexity of those multiple
+ * connections from the client. Whenever seek() is called, a new connection
+ * is made on the successive read(). The normal input stream functions are
+ * connected to the currently active input stream.
+ */
+public abstract class ByteRangeInputStream extends FSInputStream {
+
+  /**
+   * This class wraps a URL and provides method to open connection.
+   * It can be overridden to change how a connection is opened.
+   */
+  public static abstract class URLOpener {
+    protected URL url;
+
+    public URLOpener(URL u) {
+      url = u;
+    }
+
+    public void setURL(URL u) {
+      url = u;
+    }
+
+    public URL getURL() {
+      return url;
+    }
+
+    /** Connect to server with a data offset. */
+    protected abstract HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException;
+  }
+
+  enum StreamStatus {
+    NORMAL, SEEK, CLOSED
+  }
+  protected InputStream in;
+  protected URLOpener originalURL;
+  protected URLOpener resolvedURL;
+  protected long startPos = 0;
+  protected long currentPos = 0;
+  protected Long fileLength = null;
+
+  StreamStatus status = StreamStatus.SEEK;
+
+  /**
+   * Create with the specified URLOpeners. Original url is used to open the
+   * stream for the first time. Resolved url is used in subsequent requests.
+   * @param o Original url
+   * @param r Resolved url
+   */
+  public ByteRangeInputStream(URLOpener o, URLOpener r) {
+    this.originalURL = o;
+    this.resolvedURL = r;
+  }
+
+  protected abstract URL getResolvedUrl(final HttpURLConnection connection
+      ) throws IOException;
+
+  @VisibleForTesting
+  protected InputStream getInputStream() throws IOException {
+    switch (status) {
+      case NORMAL:
+        break;
+      case SEEK:
+        if (in != null) {
+          in.close();
+        }
+        in = openInputStream();
+        status = StreamStatus.NORMAL;
+        break;
+      case CLOSED:
+        throw new IOException("Stream closed");
+    }
+    return in;
+  }
+
+  @VisibleForTesting
+  protected InputStream openInputStream() throws IOException {
+    // Use the original url if no resolved url exists, eg. if
+    // it's the first time a request is made.
+    final boolean resolved = resolvedURL.getURL() != null;
+    final URLOpener opener = resolved? resolvedURL: originalURL;
+
+    final HttpURLConnection connection = opener.connect(startPos, resolved);
+    resolvedURL.setURL(getResolvedUrl(connection));
+
+    InputStream in = connection.getInputStream();
+    final Map<String, List<String>> headers = connection.getHeaderFields();
+    if (isChunkedTransferEncoding(headers)) {
+      // file length is not known
+      fileLength = null;
+    } else {
+      // for non-chunked transfer-encoding, get content-length
+      final String cl = connection.getHeaderField(HttpHeaders.CONTENT_LENGTH);
+      if (cl == null) {
+        throw new IOException(HttpHeaders.CONTENT_LENGTH + " is missing: "
+            + headers);
+      }
+      final long streamlength = Long.parseLong(cl);
+      fileLength = startPos + streamlength;
+
+      // Java has a bug with >2GB request streams.  It won't bounds check
+      // the reads so the transfer blocks until the server times out
+      in = new BoundedInputStream(in, streamlength);
+    }
+
+    return in;
+  }
+
+  private static boolean isChunkedTransferEncoding(
+      final Map<String, List<String>> headers) {
+    return contains(headers, HttpHeaders.TRANSFER_ENCODING, "chunked")
+        || contains(headers, HttpHeaders.TE, "chunked");
+  }
+
+  /** Does the HTTP header map contain the given key, value pair? */
+  private static boolean contains(final Map<String, List<String>> headers,
+      final String key, final String value) {
+    final List<String> values = headers.get(key);
+    if (values != null) {
+      for(String v : values) {
+        for(final StringTokenizer t = new StringTokenizer(v, ",");
+            t.hasMoreTokens(); ) {
+          if (value.equalsIgnoreCase(t.nextToken())) {
+            return true;
+          }
+        }
+      }
+    }
+    return false;
+  }
+
+  private int update(final int n) throws IOException {
+    if (n != -1) {
+      currentPos += n;
+    } else if (fileLength != null && currentPos < fileLength) {
+      throw new IOException("Got EOF but currentPos = " + currentPos
+          + " < filelength = " + fileLength);
+    }
+    return n;
+  }
+
+  @Override
+  public int read() throws IOException {
+    final int b = getInputStream().read();
+    update((b == -1) ? -1 : 1);
+    return b;
+  }
+
+  @Override
+  public int read(byte b[], int off, int len) throws IOException {
+    return update(getInputStream().read(b, off, len));
+  }
+
+  /**
+   * Seek to the given offset from the start of the file.
+   * The next read() will be from that location.  Can't
+   * seek past the end of the file.
+   */
+  @Override
+  public void seek(long pos) throws IOException {
+    if (pos != currentPos) {
+      startPos = pos;
+      currentPos = pos;
+      if (status != StreamStatus.CLOSED) {
+        status = StreamStatus.SEEK;
+      }
+    }
+  }
+
+  /**
+   * Return the current offset from the start of the file
+   */
+  @Override
+  public long getPos() throws IOException {
+    return currentPos;
+  }
+
+  /**
+   * Seeks a different copy of the data.  Returns true if
+   * found a new source, false otherwise.
+   */
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
+  }
+
+  @Override
+  public void close() throws IOException {
+    if (in != null) {
+      in.close();
+      in = null;
+    }
+    status = StreamStatus.CLOSED;
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HftpFileSystem.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,762 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.ConnectException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.security.PrivilegedExceptionAction;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.TimeZone;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.ContentSummary;
+import org.apache.hadoop.fs.DelegationTokenRenewer;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileChecksum;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
+import org.apache.hadoop.hdfs.server.common.JspHelper;
+import org.apache.hadoop.hdfs.tools.DelegationTokenFetcher;
+import org.apache.hadoop.hdfs.web.ByteRangeInputStream.URLOpener;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.RemoteException;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.Credentials;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.apache.hadoop.security.token.TokenRenewer;
+import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
+import org.apache.hadoop.util.Progressable;
+import org.apache.hadoop.util.ServletUtil;
+import org.xml.sax.Attributes;
+import org.xml.sax.InputSource;
+import org.xml.sax.SAXException;
+import org.xml.sax.XMLReader;
+import org.xml.sax.helpers.DefaultHandler;
+import org.xml.sax.helpers.XMLReaderFactory;
+
+/**
+ * An implementation of a protocol for accessing filesystems over HTTP.
+ * The following implementation provides a limited, read-only interface
+ * to a filesystem over HTTP.
+ * @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
+ * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class HftpFileSystem extends FileSystem
+    implements DelegationTokenRenewer.Renewable {
+  static {
+    HttpURLConnection.setFollowRedirects(true);
+  }
+
+  URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
+
+  public static final Text TOKEN_KIND = new Text("HFTP delegation");
+
+  protected UserGroupInformation ugi;
+  private URI hftpURI;
+
+  protected URI nnUri;
+
+  public static final String HFTP_TIMEZONE = "UTC";
+  public static final String HFTP_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ssZ";
+
+  private Token<?> delegationToken;
+  private Token<?> renewToken;
+  private static final HftpDelegationTokenSelector hftpTokenSelector =
+      new HftpDelegationTokenSelector();
+
+  private DelegationTokenRenewer dtRenewer = null;
+
+  private synchronized void addRenewAction(final HftpFileSystem hftpFs) {
+    if (dtRenewer == null) {
+      dtRenewer = DelegationTokenRenewer.getInstance();
+    }
+
+    dtRenewer.addRenewAction(hftpFs);
+  }
+
+  public static final SimpleDateFormat getDateFormat() {
+    final SimpleDateFormat df = new SimpleDateFormat(HFTP_DATE_FORMAT);
+    df.setTimeZone(TimeZone.getTimeZone(HFTP_TIMEZONE));
+    return df;
+  }
+
+  protected static final ThreadLocal<SimpleDateFormat> df =
+    new ThreadLocal<SimpleDateFormat>() {
+    @Override
+    protected SimpleDateFormat initialValue() {
+      return getDateFormat();
+    }
+  };
+
+  @Override
+  protected int getDefaultPort() {
+    return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY,
+        DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT);
+  }
+
+  /**
+   *  We generate the address with one of the following ports, in
+   *  order of preference.
+   *  1. Port from the hftp URI e.g. hftp://namenode:4000/ will return 4000.
+   *  2. Port configured via DFS_NAMENODE_HTTP_PORT_KEY
+   *  3. DFS_NAMENODE_HTTP_PORT_DEFAULT i.e. 50070.
+   *
+   * @param uri
+   * @return
+   */
+  protected InetSocketAddress getNamenodeAddr(URI uri) {
+    // use authority so user supplied uri can override port
+    return NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
+  }
+
+  protected URI getNamenodeUri(URI uri) {
+    return DFSUtil.createUri(getUnderlyingProtocol(), getNamenodeAddr(uri));
+  }
+
+  /**
+   * See the documentation of {@Link #getNamenodeAddr(URI)} for the logic
+   * behind selecting the canonical service name.
+   * @return
+   */
+  @Override
+  public String getCanonicalServiceName() {
+    return SecurityUtil.buildTokenService(nnUri).toString();
+  }
+
+  @Override
+  protected URI canonicalizeUri(URI uri) {
+    return NetUtils.getCanonicalUri(uri, getDefaultPort());
+  }
+
+  /**
+   * Return the protocol scheme for the FileSystem.
+   * <p/>
+   *
+   * @return <code>hftp</code>
+   */
+  @Override
+  public String getScheme() {
+    return "hftp";
+  }
+
+  @Override
+  public void initialize(final URI name, final Configuration conf)
+  throws IOException {
+    super.initialize(name, conf);
+    setConf(conf);
+    this.ugi = UserGroupInformation.getCurrentUser();
+    this.nnUri = getNamenodeUri(name);
+    try {
+      this.hftpURI = new URI(name.getScheme(), name.getAuthority(),
+                             null, null, null);
+    } catch (URISyntaxException e) {
+      throw new IllegalArgumentException(e);
+    }
+
+    if (UserGroupInformation.isSecurityEnabled()) {
+      initDelegationToken();
+    }
+  }
+
+  protected void initDelegationToken() throws IOException {
+    // look for hftp token, then try hdfs
+    Token<?> token = selectDelegationToken(ugi);
+
+    // if we don't already have a token, go get one over https
+    boolean createdToken = false;
+    if (token == null) {
+      token = getDelegationToken(null);
+      createdToken = (token != null);
+    }
+
+    // we already had a token or getDelegationToken() didn't fail.
+    if (token != null) {
+      setDelegationToken(token);
+      if (createdToken) {
+        addRenewAction(this);
+        LOG.debug("Created new DT for " + token.getService());
+      } else {
+        LOG.debug("Found existing DT for " + token.getService());
+      }
+    }
+  }
+
+  protected Token<DelegationTokenIdentifier> selectDelegationToken(
+      UserGroupInformation ugi) {
+    return hftpTokenSelector.selectToken(nnUri, ugi.getTokens(), getConf());
+  }
+
+
+  @Override
+  public Token<?> getRenewToken() {
+    return renewToken;
+  }
+
+  /**
+   * Return the underlying protocol that is used to talk to the namenode.
+   */
+  protected String getUnderlyingProtocol() {
+    return "http";
+  }
+
+  @Override
+  public synchronized <T extends TokenIdentifier> void setDelegationToken(Token<T> token) {
+    renewToken = token;
+    // emulate the 203 usage of the tokens
+    // by setting the kind and service as if they were hdfs tokens
+    delegationToken = new Token<T>(token);
+    // NOTE: the remote nn must be configured to use hdfs
+    delegationToken.setKind(DelegationTokenIdentifier.HDFS_DELEGATION_KIND);
+    // no need to change service because we aren't exactly sure what it
+    // should be.  we can guess, but it might be wrong if the local conf
+    // value is incorrect.  the service is a client side field, so the remote
+    // end does not care about the value
+  }
+
+  @Override
+  public synchronized Token<?> getDelegationToken(final String renewer
+                                                  ) throws IOException {
+    try {
+      //Renew TGT if needed
+      ugi.checkTGTAndReloginFromKeytab();
+      return ugi.doAs(new PrivilegedExceptionAction<Token<?>>() {
+        @Override
+        public Token<?> run() throws IOException {
+          final String nnHttpUrl = nnUri.toString();
+          Credentials c;
+          try {
+            c = DelegationTokenFetcher.getDTfromRemote(nnHttpUrl, renewer);
+          } catch (IOException e) {
+            if (e.getCause() instanceof ConnectException) {
+              LOG.warn("Couldn't connect to " + nnHttpUrl +
+                  ", assuming security is disabled");
+              return null;
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Exception getting delegation token", e);
+            }
+            throw e;
+          }
+          for (Token<? extends TokenIdentifier> t : c.getAllTokens()) {
+            if(LOG.isDebugEnabled()) {
+              LOG.debug("Got dt for " + getUri() + ";t.service="
+                  +t.getService());
+            }
+            return t;
+          }
+          return null;
+        }
+      });
+    } catch (InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public URI getUri() {
+    return hftpURI;
+  }
+
+  /**
+   * Return a URL pointing to given path on the namenode.
+   *
+   * @param path to obtain the URL for
+   * @param query string to append to the path
+   * @return namenode URL referring to the given path
+   * @throws IOException on error constructing the URL
+   */
+  protected URL getNamenodeURL(String path, String query) throws IOException {
+    final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
+          nnUri.getPort(), path + '?' + query);
+    if (LOG.isTraceEnabled()) {
+      LOG.trace("url=" + url);
+    }
+    return url;
+  }
+
+  /**
+   * Get encoded UGI parameter string for a URL.
+   *
+   * @return user_shortname,group1,group2...
+   */
+  private String getEncodedUgiParameter() {
+    StringBuilder ugiParamenter = new StringBuilder(
+        ServletUtil.encodeQueryValue(ugi.getShortUserName()));
+    for(String g: ugi.getGroupNames()) {
+      ugiParamenter.append(",");
+      ugiParamenter.append(ServletUtil.encodeQueryValue(g));
+    }
+    return ugiParamenter.toString();
+  }
+
+  /**
+   * Open an HTTP connection to the namenode to read file data and metadata.
+   * @param path The path component of the URL
+   * @param query The query component of the URL
+   */
+  protected HttpURLConnection openConnection(String path, String query)
+      throws IOException {
+    query = addDelegationTokenParam(query);
+    final URL url = getNamenodeURL(path, query);
+    final HttpURLConnection connection;
+    connection = (HttpURLConnection)connectionFactory.openConnection(url);
+    connection.setRequestMethod("GET");
+    connection.connect();
+    return connection;
+  }
+
+  protected String addDelegationTokenParam(String query) throws IOException {
+    String tokenString = null;
+    if (UserGroupInformation.isSecurityEnabled()) {
+      synchronized (this) {
+        if (delegationToken != null) {
+          tokenString = delegationToken.encodeToUrlString();
+          return (query + JspHelper.getDelegationTokenUrlParam(tokenString));
+        }
+      }
+    }
+    return query;
+  }
+
+  static class RangeHeaderUrlOpener extends ByteRangeInputStream.URLOpener {
+    URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
+
+    RangeHeaderUrlOpener(final URL url) {
+      super(url);
+    }
+
+    protected HttpURLConnection openConnection() throws IOException {
+      return (HttpURLConnection)connectionFactory.openConnection(url);
+    }
+
+    /** Use HTTP Range header for specifying offset. */
+    @Override
+    protected HttpURLConnection connect(final long offset,
+        final boolean resolved) throws IOException {
+      final HttpURLConnection conn = openConnection();
+      conn.setRequestMethod("GET");
+      if (offset != 0L) {
+        conn.setRequestProperty("Range", "bytes=" + offset + "-");
+      }
+      conn.connect();
+
+      //Expects HTTP_OK or HTTP_PARTIAL response codes.
+      final int code = conn.getResponseCode();
+      if (offset != 0L && code != HttpURLConnection.HTTP_PARTIAL) {
+        throw new IOException("HTTP_PARTIAL expected, received " + code);
+      } else if (offset == 0L && code != HttpURLConnection.HTTP_OK) {
+        throw new IOException("HTTP_OK expected, received " + code);
+      }
+      return conn;
+    }
+  }
+
+  static class RangeHeaderInputStream extends ByteRangeInputStream {
+    RangeHeaderInputStream(RangeHeaderUrlOpener o, RangeHeaderUrlOpener r) {
+      super(o, r);
+    }
+
+    RangeHeaderInputStream(final URL url) {
+      this(new RangeHeaderUrlOpener(url), new RangeHeaderUrlOpener(null));
+    }
+
+    @Override
+    protected URL getResolvedUrl(final HttpURLConnection connection) {
+      return connection.getURL();
+    }
+  }
+
+  @Override
+  public FSDataInputStream open(Path f, int buffersize) throws IOException {
+    f = f.makeQualified(getUri(), getWorkingDirectory());
+    String path = "/data" + ServletUtil.encodePath(f.toUri().getPath());
+    String query = addDelegationTokenParam("ugi=" + getEncodedUgiParameter());
+    URL u = getNamenodeURL(path, query);
+    return new FSDataInputStream(new RangeHeaderInputStream(u));
+  }
+
+  @Override
+  public void close() throws IOException {
+    super.close();
+    if (dtRenewer != null) {
+      dtRenewer.removeRenewAction(this); // blocks
+    }
+  }
+
+  /** Class to parse and store a listing reply from the server. */
+  class LsParser extends DefaultHandler {
+
+    ArrayList<FileStatus> fslist = new ArrayList<FileStatus>();
+
+    @Override
+    public void startElement(String ns, String localname, String qname,
+                Attributes attrs) throws SAXException {
+      if ("listing".equals(qname)) return;
+      if (!"file".equals(qname) && !"directory".equals(qname)) {
+        if (RemoteException.class.getSimpleName().equals(qname)) {
+          throw new SAXException(RemoteException.valueOf(attrs));
+        }
+        throw new SAXException("Unrecognized entry: " + qname);
+      }
+      long modif;
+      long atime = 0;
+      try {
+        final SimpleDateFormat ldf = df.get();
+        modif = ldf.parse(attrs.getValue("modified")).getTime();
+        String astr = attrs.getValue("accesstime");
+        if (astr != null) {
+          atime = ldf.parse(astr).getTime();
+        }
+      } catch (ParseException e) { throw new SAXException(e); }
+      FileStatus fs = "file".equals(qname)
+        ? new FileStatus(
+              Long.valueOf(attrs.getValue("size")).longValue(), false,
+              Short.valueOf(attrs.getValue("replication")).shortValue(),
+              Long.valueOf(attrs.getValue("blocksize")).longValue(),
+              modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
+              attrs.getValue("owner"), attrs.getValue("group"),
+              HftpFileSystem.this.makeQualified(
+                  new Path(getUri().toString(), attrs.getValue("path"))))
+        : new FileStatus(0L, true, 0, 0L,
+              modif, atime, FsPermission.valueOf(attrs.getValue("permission")),
+              attrs.getValue("owner"), attrs.getValue("group"),
+              HftpFileSystem.this.makeQualified(
+                  new Path(getUri().toString(), attrs.getValue("path"))));
+      fslist.add(fs);
+    }
+
+    private void fetchList(String path, boolean recur) throws IOException {
+      try {
+        XMLReader xr = XMLReaderFactory.createXMLReader();
+        xr.setContentHandler(this);
+        HttpURLConnection connection = openConnection(
+            "/listPaths" + ServletUtil.encodePath(path),
+            "ugi=" + getEncodedUgiParameter() + (recur ? "&recursive=yes" : ""));
+        InputStream resp = connection.getInputStream();
+        xr.parse(new InputSource(resp));
+      } catch(SAXException e) {
+        final Exception embedded = e.getException();
+        if (embedded != null && embedded instanceof IOException) {
+          throw (IOException)embedded;
+        }
+        throw new IOException("invalid xml directory content", e);
+      }
+    }
+
+    public FileStatus getFileStatus(Path f) throws IOException {
+      fetchList(f.toUri().getPath(), false);
+      if (fslist.size() == 0) {
+        throw new FileNotFoundException("File does not exist: " + f);
+      }
+      return fslist.get(0);
+    }
+
+    public FileStatus[] listStatus(Path f, boolean recur) throws IOException {
+      fetchList(f.toUri().getPath(), recur);
+      if (fslist.size() > 0 && (fslist.size() != 1 || fslist.get(0).isDirectory())) {
+        fslist.remove(0);
+      }
+      return fslist.toArray(new FileStatus[0]);
+    }
+
+    public FileStatus[] listStatus(Path f) throws IOException {
+      return listStatus(f, false);
+    }
+  }
+
+  @Override
+  public FileStatus[] listStatus(Path f) throws IOException {
+    LsParser lsparser = new LsParser();
+    return lsparser.listStatus(f);
+  }
+
+  @Override
+  public FileStatus getFileStatus(Path f) throws IOException {
+    LsParser lsparser = new LsParser();
+    return lsparser.getFileStatus(f);
+  }
+
+  private class ChecksumParser extends DefaultHandler {
+    private FileChecksum filechecksum;
+
+    @Override
+    public void startElement(String ns, String localname, String qname,
+                Attributes attrs) throws SAXException {
+      if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
+        if (RemoteException.class.getSimpleName().equals(qname)) {
+          throw new SAXException(RemoteException.valueOf(attrs));
+        }
+        throw new SAXException("Unrecognized entry: " + qname);
+      }
+
+      filechecksum = MD5MD5CRC32FileChecksum.valueOf(attrs);
+    }
+
+    private FileChecksum getFileChecksum(String f) throws IOException {
+      final HttpURLConnection connection = openConnection(
+          "/fileChecksum" + ServletUtil.encodePath(f),
+          "ugi=" + getEncodedUgiParameter());
+      try {
+        final XMLReader xr = XMLReaderFactory.createXMLReader();
+        xr.setContentHandler(this);
+        xr.parse(new InputSource(connection.getInputStream()));
+      } catch(SAXException e) {
+        final Exception embedded = e.getException();
+        if (embedded != null && embedded instanceof IOException) {
+          throw (IOException)embedded;
+        }
+        throw new IOException("invalid xml directory content", e);
+      } finally {
+        connection.disconnect();
+      }
+      return filechecksum;
+    }
+  }
+
+  @Override
+  public FileChecksum getFileChecksum(Path f) throws IOException {
+    final String s = makeQualified(f).toUri().getPath();
+    return new ChecksumParser().getFileChecksum(s);
+  }
+
+  @Override
+  public Path getWorkingDirectory() {
+    return new Path("/").makeQualified(getUri(), null);
+  }
+
+  @Override
+  public void setWorkingDirectory(Path f) { }
+
+  /** This optional operation is not yet supported. */
+  @Override
+  public FSDataOutputStream append(Path f, int bufferSize,
+      Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public FSDataOutputStream create(Path f, FsPermission permission,
+      boolean overwrite, int bufferSize, short replication,
+      long blockSize, Progressable progress) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public boolean rename(Path src, Path dst) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public boolean delete(Path f, boolean recursive) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  @Override
+  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+    throw new IOException("Not supported");
+  }
+
+  /**
+   * A parser for parsing {@link ContentSummary} xml.
+   */
+  private class ContentSummaryParser extends DefaultHandler {
+    private ContentSummary contentsummary;
+
+    @Override
+    public void startElement(String ns, String localname, String qname,
+                Attributes attrs) throws SAXException {
+      if (!ContentSummary.class.getName().equals(qname)) {
+        if (RemoteException.class.getSimpleName().equals(qname)) {
+          throw new SAXException(RemoteException.valueOf(attrs));
+        }
+        throw new SAXException("Unrecognized entry: " + qname);
+      }
+
+      contentsummary = toContentSummary(attrs);
+    }
+
+    /**
+     * Connect to the name node and get content summary.
+     * @param path The path
+     * @return The content summary for the path.
+     * @throws IOException
+     */
+    private ContentSummary getContentSummary(String path) throws IOException {
+      final HttpURLConnection connection = openConnection(
+          "/contentSummary" + ServletUtil.encodePath(path),
+          "ugi=" + getEncodedUgiParameter());
+      InputStream in = null;
+      try {
+        in = connection.getInputStream();
+
+        final XMLReader xr = XMLReaderFactory.createXMLReader();
+        xr.setContentHandler(this);
+        xr.parse(new InputSource(in));
+      } catch(FileNotFoundException fnfe) {
+        //the server may not support getContentSummary
+        return null;
+      } catch(SAXException saxe) {
+        final Exception embedded = saxe.getException();
+        if (embedded != null && embedded instanceof IOException) {
+          throw (IOException)embedded;
+        }
+        throw new IOException("Invalid xml format", saxe);
+      } finally {
+        if (in != null) {
+          in.close();
+        }
+        connection.disconnect();
+      }
+      return contentsummary;
+    }
+  }
+
+  /** Return the object represented in the attributes. */
+  private static ContentSummary toContentSummary(Attributes attrs
+      ) throws SAXException {
+    final String length = attrs.getValue("length");
+    final String fileCount = attrs.getValue("fileCount");
+    final String directoryCount = attrs.getValue("directoryCount");
+    final String quota = attrs.getValue("quota");
+    final String spaceConsumed = attrs.getValue("spaceConsumed");
+    final String spaceQuota = attrs.getValue("spaceQuota");
+
+    if (length == null
+        || fileCount == null
+        || directoryCount == null
+        || quota == null
+        || spaceConsumed == null
+        || spaceQuota == null) {
+      return null;
+    }
+
+    try {
+      return new ContentSummary(
+          Long.parseLong(length),
+          Long.parseLong(fileCount),
+          Long.parseLong(directoryCount),
+          Long.parseLong(quota),
+          Long.parseLong(spaceConsumed),
+          Long.parseLong(spaceQuota));
+    } catch(Exception e) {
+      throw new SAXException("Invalid attributes: length=" + length
+          + ", fileCount=" + fileCount
+          + ", directoryCount=" + directoryCount
+          + ", quota=" + quota
+          + ", spaceConsumed=" + spaceConsumed
+          + ", spaceQuota=" + spaceQuota, e);
+    }
+  }
+
+  @Override
+  public ContentSummary getContentSummary(Path f) throws IOException {
+    final String s = makeQualified(f).toUri().getPath();
+    final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);
+    return cs != null? cs: super.getContentSummary(f);
+  }
+
+  @InterfaceAudience.Private
+  public static class TokenManager extends TokenRenewer {
+
+    @Override
+    public boolean handleKind(Text kind) {
+      return kind.equals(TOKEN_KIND);
+    }
+
+    @Override
+    public boolean isManaged(Token<?> token) throws IOException {
+      return true;
+    }
+
+    protected String getUnderlyingProtocol() {
+      return "http";
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public long renew(Token<?> token,
+                      Configuration conf) throws IOException {
+      // update the kerberos credentials, if they are coming from a keytab
+      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+      InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+      return
+        DelegationTokenFetcher.renewDelegationToken
+        (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
+         (Token<DelegationTokenIdentifier>) token);
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    public void cancel(Token<?> token,
+                       Configuration conf) throws IOException {
+      // update the kerberos credentials, if they are coming from a keytab
+      UserGroupInformation.getLoginUser().checkTGTAndReloginFromKeytab();
+      InetSocketAddress serviceAddr = SecurityUtil.getTokenServiceAddr(token);
+      DelegationTokenFetcher.cancelDelegationToken
+        (DFSUtil.createUri(getUnderlyingProtocol(), serviceAddr).toString(),
+         (Token<DelegationTokenIdentifier>) token);
+    }
+  }
+
+  private static class HftpDelegationTokenSelector
+  extends AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
+    private static final DelegationTokenSelector hdfsTokenSelector =
+        new DelegationTokenSelector();
+
+    public HftpDelegationTokenSelector() {
+      super(TOKEN_KIND);
+    }
+
+    Token<DelegationTokenIdentifier> selectToken(URI nnUri,
+        Collection<Token<?>> tokens, Configuration conf) {
+      Token<DelegationTokenIdentifier> token =
+          selectToken(SecurityUtil.buildTokenService(nnUri), tokens);
+      if (token == null) {
+        // try to get a HDFS token
+        token = hdfsTokenSelector.selectToken(nnUri, tokens, conf);
+      }
+      return token;
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/HsftpFileSystem.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.net.HttpURLConnection;
+import java.net.InetSocketAddress;
+import java.net.URI;
+import java.net.URL;
+import java.security.KeyStore;
+import java.security.cert.X509Certificate;
+
+import javax.net.ssl.HostnameVerifier;
+import javax.net.ssl.HttpsURLConnection;
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSession;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.TrustManagerFactory;
+import javax.net.ssl.X509TrustManager;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.HdfsConfiguration;
+import org.apache.hadoop.util.Time;
+
+/**
+ * An implementation of a protocol for accessing filesystems over HTTPS. The
+ * following implementation provides a limited, read-only interface to a
+ * filesystem over HTTPS.
+ *
+ * @see org.apache.hadoop.hdfs.server.namenode.ListPathsServlet
+ * @see org.apache.hadoop.hdfs.server.namenode.FileDataServlet
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Evolving
+public class HsftpFileSystem extends HftpFileSystem {
+
+  private static final long MM_SECONDS_PER_DAY = 1000 * 60 * 60 * 24;
+  private volatile int ExpWarnDays = 0;
+
+  /**
+   * Return the protocol scheme for the FileSystem.
+   * <p/>
+   *
+   * @return <code>hsftp</code>
+   */
+  @Override
+  public String getScheme() {
+    return "hsftp";
+  }
+
+  /**
+   * Return the underlying protocol that is used to talk to the namenode.
+   */
+  @Override
+  protected String getUnderlyingProtocol() {
+    return "https";
+  }
+
+  @Override
+  public void initialize(URI name, Configuration conf) throws IOException {
+    super.initialize(name, conf);
+    setupSsl(conf);
+    ExpWarnDays = conf.getInt("ssl.expiration.warn.days", 30);
+  }
+
+  /**
+   * Set up SSL resources
+   *
+   * @throws IOException
+   */
+  private static void setupSsl(Configuration conf) throws IOException {
+    Configuration sslConf = new HdfsConfiguration(false);
+    sslConf.addResource(conf.get(DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_KEY,
+                             DFSConfigKeys.DFS_CLIENT_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+    FileInputStream fis = null;
+    try {
+      SSLContext sc = SSLContext.getInstance("SSL");
+      KeyManager[] kms = null;
+      TrustManager[] tms = null;
+      if (sslConf.get("ssl.client.keystore.location") != null) {
+        // initialize default key manager with keystore file and pass
+        KeyManagerFactory kmf = KeyManagerFactory.getInstance("SunX509");
+        KeyStore ks = KeyStore.getInstance(sslConf.get(
+            "ssl.client.keystore.type", "JKS"));
+        char[] ksPass = sslConf.get("ssl.client.keystore.password", "changeit")
+            .toCharArray();
+        fis = new FileInputStream(sslConf.get("ssl.client.keystore.location",
+            "keystore.jks"));
+        ks.load(fis, ksPass);
+        kmf.init(ks, sslConf.get("ssl.client.keystore.keypassword", "changeit")
+            .toCharArray());
+        kms = kmf.getKeyManagers();
+        fis.close();
+        fis = null;
+      }
+      // initialize default trust manager with truststore file and pass
+      if (sslConf.getBoolean("ssl.client.do.not.authenticate.server", false)) {
+        // by pass trustmanager validation
+        tms = new DummyTrustManager[] { new DummyTrustManager() };
+      } else {
+        TrustManagerFactory tmf = TrustManagerFactory.getInstance("PKIX");
+        KeyStore ts = KeyStore.getInstance(sslConf.get(
+            "ssl.client.truststore.type", "JKS"));
+        char[] tsPass = sslConf.get("ssl.client.truststore.password",
+            "changeit").toCharArray();
+        fis = new FileInputStream(sslConf.get("ssl.client.truststore.location",
+            "truststore.jks"));
+        ts.load(fis, tsPass);
+        tmf.init(ts);
+        tms = tmf.getTrustManagers();
+      }
+      sc.init(kms, tms, new java.security.SecureRandom());
+      HttpsURLConnection.setDefaultSSLSocketFactory(sc.getSocketFactory());
+    } catch (Exception e) {
+      throw new IOException("Could not initialize SSLContext", e);
+    } finally {
+      if (fis != null) {
+        fis.close();
+      }
+    }
+  }
+
+  @Override
+  protected int getDefaultPort() {
+    return getConf().getInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY,
+                            DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT);
+  }
+
+  @Override
+  protected HttpURLConnection openConnection(String path, String query)
+      throws IOException {
+    query = addDelegationTokenParam(query);
+    final URL url = new URL(getUnderlyingProtocol(), nnUri.getHost(),
+        nnUri.getPort(), path + '?' + query);
+    HttpsURLConnection conn;
+    conn = (HttpsURLConnection)connectionFactory.openConnection(url);
+    // bypass hostname verification
+    conn.setHostnameVerifier(new DummyHostnameVerifier());
+    conn.setRequestMethod("GET");
+    conn.connect();
+
+    // check cert expiration date
+    final int warnDays = ExpWarnDays;
+    if (warnDays > 0) { // make sure only check once
+      ExpWarnDays = 0;
+      long expTimeThreshold = warnDays * MM_SECONDS_PER_DAY + Time.now();
+      X509Certificate[] clientCerts = (X509Certificate[]) conn
+          .getLocalCertificates();
+      if (clientCerts != null) {
+        for (X509Certificate cert : clientCerts) {
+          long expTime = cert.getNotAfter().getTime();
+          if (expTime < expTimeThreshold) {
+            StringBuilder sb = new StringBuilder();
+            sb.append("\n Client certificate "
+                + cert.getSubjectX500Principal().getName());
+            int dayOffSet = (int) ((expTime - Time.now()) / MM_SECONDS_PER_DAY);
+            sb.append(" have " + dayOffSet + " days to expire");
+            LOG.warn(sb.toString());
+          }
+        }
+      }
+    }
+    return (HttpURLConnection) conn;
+  }
+
+  /**
+   * Dummy hostname verifier that is used to bypass hostname checking
+   */
+  protected static class DummyHostnameVerifier implements HostnameVerifier {
+    @Override
+    public boolean verify(String hostname, SSLSession session) {
+      return true;
+    }
+  }
+
+  /**
+   * Dummy trustmanager that is used to trust all server certificates
+   */
+  protected static class DummyTrustManager implements X509TrustManager {
+    @Override
+    public void checkClientTrusted(X509Certificate[] chain, String authType) {
+    }
+
+    @Override
+    public void checkServerTrusted(X509Certificate[] chain, String authType) {
+    }
+
+    @Override
+    public X509Certificate[] getAcceptedIssuers() {
+      return null;
+    }
+  }
+
+}

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Tue Oct 29 22:44:34 2013
@@ -51,7 +51,6 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
 import org.apache.hadoop.hdfs.HAUtil;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem Tue Oct 29 22:44:34 2013
@@ -14,6 +14,6 @@
 # limitations under the License.
 
 org.apache.hadoop.hdfs.DistributedFileSystem
-org.apache.hadoop.hdfs.HftpFileSystem
-org.apache.hadoop.hdfs.HsftpFileSystem
+org.apache.hadoop.hdfs.web.HftpFileSystem
+org.apache.hadoop.hdfs.web.HsftpFileSystem
 org.apache.hadoop.hdfs.web.WebHdfsFileSystem

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/META-INF/services/org.apache.hadoop.security.token.TokenRenewer Tue Oct 29 22:44:34 2013
@@ -13,5 +13,5 @@
 #
 org.apache.hadoop.hdfs.DFSClient$Renewer
 org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier$Renewer
-org.apache.hadoop.hdfs.HftpFileSystem$TokenManager
+org.apache.hadoop.hdfs.web.HftpFileSystem$TokenManager
 org.apache.hadoop.hdfs.web.WebHdfsFileSystem$DtRenewer

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java Tue Oct 29 22:44:34 2013
@@ -89,6 +89,7 @@ import org.apache.hadoop.hdfs.server.nam
 import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
 import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
 import org.apache.hadoop.hdfs.tools.DFSAdmin;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDistributedFileSystem.java Tue Oct 29 22:44:34 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.fs.VolumeId;
 import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.util.DataChecksum;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileStatus.java Tue Oct 29 22:44:34 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
 import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.log4j.Level;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestListPathServlet.java Tue Oct 29 22:44:34 2013
@@ -29,6 +29,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.server.namenode.ListPathsServlet;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.junit.AfterClass;
 import org.junit.Assert;

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java?rev=1536921&r1=1536920&r2=1536921&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestAuditLogs.java Tue Oct 29 22:44:34 2013
@@ -39,8 +39,8 @@ import org.apache.hadoop.fs.permission.F
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.hdfs.web.WebHdfsTestUtil;
 import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
 import org.apache.hadoop.security.AccessControlException;

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestByteRangeInputStream.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,245 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.web;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.HttpURLConnection;
+import java.net.URL;
+
+import org.apache.hadoop.hdfs.server.namenode.StreamFile;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.junit.Test;
+
+public class TestByteRangeInputStream {
+public static class MockHttpURLConnection extends HttpURLConnection {
+  public MockHttpURLConnection(URL u) {
+    super(u);
+  }
+
+  @Override
+  public boolean usingProxy(){
+    return false;
+  }
+
+  @Override
+  public void disconnect() {
+  }
+
+  @Override
+  public void connect() {
+  }
+
+  @Override
+  public InputStream getInputStream() throws IOException {
+    return new ByteArrayInputStream("asdf".getBytes());
+  }
+
+  @Override
+  public URL getURL() {
+    URL u = null;
+    try {
+      u = new URL("http://resolvedurl/");
+    } catch (Exception e) {
+      System.out.println(e.getMessage());
+    }
+    return u;
+  }
+
+  @Override
+  public int getResponseCode() {
+    if (responseCode != -1) {
+      return responseCode;
+    } else {
+      if (getRequestProperty("Range") == null) {
+        return 200;
+      } else {
+        return 206;
+      }
+    }
+  }
+
+  public void setResponseCode(int resCode) {
+    responseCode = resCode;
+  }
+
+  @Override
+  public String getHeaderField(String field) {
+    return (field.equalsIgnoreCase(StreamFile.CONTENT_LENGTH)) ? "65535" : null;
+  }
+}
+
+  @Test
+  public void testByteRange() throws IOException {
+    HftpFileSystem.RangeHeaderUrlOpener ospy = spy(
+        new HftpFileSystem.RangeHeaderUrlOpener(new URL("http://test/")));
+    doReturn(new MockHttpURLConnection(ospy.getURL())).when(ospy)
+        .openConnection();
+    HftpFileSystem.RangeHeaderUrlOpener rspy = spy(
+        new HftpFileSystem.RangeHeaderUrlOpener((URL) null));
+    doReturn(new MockHttpURLConnection(rspy.getURL())).when(rspy)
+        .openConnection();
+    ByteRangeInputStream is = new HftpFileSystem.RangeHeaderInputStream(ospy, rspy);
+
+    assertEquals("getPos wrong", 0, is.getPos());
+
+    is.read();
+
+    assertNull("Initial call made incorrectly (Range Check)", ospy
+        .openConnection().getRequestProperty("Range"));
+
+    assertEquals("getPos should be 1 after reading one byte", 1, is.getPos());
+
+    is.read();
+
+    assertEquals("getPos should be 2 after reading two bytes", 2, is.getPos());
+
+    // No additional connections should have been made (no seek)
+
+    rspy.setURL(new URL("http://resolvedurl/"));
+
+    is.seek(100);
+    is.read();
+
+    assertEquals("Seek to 100 bytes made incorrectly (Range Check)",
+        "bytes=100-", rspy.openConnection().getRequestProperty("Range"));
+
+    assertEquals("getPos should be 101 after reading one byte", 101,
+        is.getPos());
+
+    verify(rspy, times(2)).openConnection();
+
+    is.seek(101);
+    is.read();
+
+    verify(rspy, times(2)).openConnection();
+
+    // Seek to 101 should not result in another request"
+
+    is.seek(2500);
+    is.read();
+
+    assertEquals("Seek to 2500 bytes made incorrectly (Range Check)",
+        "bytes=2500-", rspy.openConnection().getRequestProperty("Range"));
+
+    ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(200);
+    is.seek(500);
+
+    try {
+      is.read();
+      fail("Exception should be thrown when 200 response is given "
+           + "but 206 is expected");
+    } catch (IOException e) {
+      assertEquals("Should fail because incorrect response code was sent",
+                   "HTTP_PARTIAL expected, received 200", e.getMessage());
+    }
+
+    ((MockHttpURLConnection) rspy.openConnection()).setResponseCode(206);
+    is.seek(0);
+
+    try {
+      is.read();
+      fail("Exception should be thrown when 206 response is given "
+           + "but 200 is expected");
+    } catch (IOException e) {
+      assertEquals("Should fail because incorrect response code was sent",
+                   "HTTP_OK expected, received 206", e.getMessage());
+    }
+  }
+
+  @Test
+  public void testPropagatedClose() throws IOException {
+    ByteRangeInputStream brs = spy(
+        new HftpFileSystem.RangeHeaderInputStream(new URL("http://test/")));
+
+    InputStream mockStream = mock(InputStream.class);
+    doReturn(mockStream).when(brs).openInputStream();
+
+    int brisOpens = 0;
+    int brisCloses = 0;
+    int isCloses = 0;
+
+    // first open, shouldn't close underlying stream
+    brs.getInputStream();
+    verify(brs, times(++brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+
+    // stream is open, shouldn't close underlying stream
+    brs.getInputStream();
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+
+    // seek forces a reopen, should close underlying stream
+    brs.seek(1);
+    brs.getInputStream();
+    verify(brs, times(++brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(++isCloses)).close();
+
+    // verify that the underlying stream isn't closed after a seek
+    // ie. the state was correctly updated
+    brs.getInputStream();
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+
+    // seeking to same location should be a no-op
+    brs.seek(1);
+    brs.getInputStream();
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+
+    // close should of course close
+    brs.close();
+    verify(brs, times(++brisCloses)).close();
+    verify(mockStream, times(++isCloses)).close();
+
+    // it's already closed, underlying stream should not close
+    brs.close();
+    verify(brs, times(++brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+
+    // it's closed, don't reopen it
+    boolean errored = false;
+    try {
+      brs.getInputStream();
+    } catch (IOException e) {
+      errored = true;
+      assertEquals("Stream closed", e.getMessage());
+    } finally {
+      assertTrue("Read a closed steam", errored);
+    }
+    verify(brs, times(brisOpens)).openInputStream();
+    verify(brs, times(brisCloses)).close();
+    verify(mockStream, times(isCloses)).close();
+  }
+}

Added: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java?rev=1536921&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java (added)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/web/TestHftpDelegationToken.java Tue Oct 29 22:44:34 2013
@@ -0,0 +1,276 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.web;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.net.URI;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
+import org.apache.hadoop.hdfs.web.HsftpFileSystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.security.SecurityUtilTestHelper;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.security.token.TokenIdentifier;
+import org.junit.Test;
+
+public class TestHftpDelegationToken {
+
+  @Test
+  public void testHdfsDelegationToken() throws Exception {
+    SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+    final Configuration conf = new Configuration();
+    conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+    UserGroupInformation.setConfiguration(conf);
+    UserGroupInformation user =
+      UserGroupInformation.createUserForTesting("oom",
+                                                new String[]{"memory"});
+    Token<?> token = new Token<TokenIdentifier>
+      (new byte[0], new byte[0],
+       DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
+       new Text("127.0.0.1:8020"));
+    user.addToken(token);
+    Token<?> token2 = new Token<TokenIdentifier>
+      (null, null, new Text("other token"), new Text("127.0.0.1:8021"));
+    user.addToken(token2);
+    assertEquals("wrong tokens in user", 2, user.getTokens().size());
+    FileSystem fs =
+      user.doAs(new PrivilegedExceptionAction<FileSystem>() {
+	  @Override
+    public FileSystem run() throws Exception {
+            return FileSystem.get(new URI("hftp://localhost:50470/"), conf);
+	  }
+	});
+    assertSame("wrong kind of file system", HftpFileSystem.class,
+                 fs.getClass());
+    Field renewToken = HftpFileSystem.class.getDeclaredField("renewToken");
+    renewToken.setAccessible(true);
+    assertSame("wrong token", token, renewToken.get(fs));
+  }
+
+  @Test
+  public void testSelectHftpDelegationToken() throws Exception {
+    SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+    Configuration conf = new Configuration();
+    conf.setClass("fs.hftp.impl", MyHftpFileSystem.class, FileSystem.class);
+
+    int httpPort = 80;
+    int httpsPort = 443;
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
+
+    // test with implicit default port
+    URI fsUri = URI.create("hftp://localhost");
+    MyHftpFileSystem fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    assertEquals(httpPort, fs.getCanonicalUri().getPort());
+    checkTokenSelection(fs, httpPort, conf);
+
+    // test with explicit default port
+    // Make sure it uses the port from the hftp URI.
+    fsUri = URI.create("hftp://localhost:"+httpPort);
+    fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    assertEquals(httpPort, fs.getCanonicalUri().getPort());
+    checkTokenSelection(fs, httpPort, conf);
+
+    // test with non-default port
+    // Make sure it uses the port from the hftp URI.
+    fsUri = URI.create("hftp://localhost:"+(httpPort+1));
+    fs = (MyHftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    assertEquals(httpPort+1, fs.getCanonicalUri().getPort());
+    checkTokenSelection(fs, httpPort + 1, conf);
+
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
+  }
+
+  @Test
+  public void testSelectHsftpDelegationToken() throws Exception {
+    SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+    Configuration conf = new Configuration();
+    conf.setClass("fs.hsftp.impl", MyHsftpFileSystem.class, FileSystem.class);
+
+    int httpPort = 80;
+    int httpsPort = 443;
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_KEY, httpPort);
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, httpsPort);
+
+    // test with implicit default port
+    URI fsUri = URI.create("hsftp://localhost");
+    MyHsftpFileSystem fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    assertEquals(httpsPort, fs.getCanonicalUri().getPort());
+    checkTokenSelection(fs, httpsPort, conf);
+
+    // test with explicit default port
+    fsUri = URI.create("hsftp://localhost:"+httpsPort);
+    fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    assertEquals(httpsPort, fs.getCanonicalUri().getPort());
+    checkTokenSelection(fs, httpsPort, conf);
+
+    // test with non-default port
+    fsUri = URI.create("hsftp://localhost:"+(httpsPort+1));
+    fs = (MyHsftpFileSystem) FileSystem.newInstance(fsUri, conf);
+    assertEquals(httpsPort+1, fs.getCanonicalUri().getPort());
+    checkTokenSelection(fs, httpsPort+1, conf);
+
+    conf.setInt(DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, 5);
+  }
+
+
+  @Test
+  public void testInsecureRemoteCluster()  throws Exception {
+    final ServerSocket socket = new ServerSocket(0); // just reserve a port
+    socket.close();
+    Configuration conf = new Configuration();
+    URI fsUri = URI.create("hsftp://localhost:"+socket.getLocalPort());
+    assertNull(FileSystem.newInstance(fsUri, conf).getDelegationToken(null));
+  }
+
+  @Test
+  public void testSecureClusterError()  throws Exception {
+    final ServerSocket socket = new ServerSocket(0);
+    Thread t = new Thread() {
+      @Override
+      public void run() {
+        while (true) { // fetching does a few retries
+          try {
+            Socket s = socket.accept();
+            s.getOutputStream().write(1234);
+            s.shutdownOutput();
+          } catch (Exception e) {
+            break;
+          }
+        }
+      }
+    };
+    t.start();
+
+    try {
+      Configuration conf = new Configuration();
+      URI fsUri = URI.create("hsftp://localhost:"+socket.getLocalPort());
+      Exception ex = null;
+      try {
+        FileSystem.newInstance(fsUri, conf).getDelegationToken(null);
+      } catch (Exception e) {
+        ex = e;
+      }
+      assertNotNull(ex);
+      assertNotNull(ex.getCause());
+      assertEquals("Remote host closed connection during handshake",
+                   ex.getCause().getMessage());
+    } finally {
+      t.interrupt();
+    }
+  }
+
+  private void checkTokenSelection(HftpFileSystem fs,
+                                   int port,
+                                   Configuration conf) throws IOException {
+    UserGroupInformation ugi =
+        UserGroupInformation.createUserForTesting(fs.getUri().getAuthority(), new String[]{});
+
+    // use ip-based tokens
+    SecurityUtilTestHelper.setTokenServiceUseIp(true);
+
+    // test fallback to hdfs token
+    Token<?> hdfsToken = new Token<TokenIdentifier>(
+        new byte[0], new byte[0],
+        DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
+        new Text("127.0.0.1:8020"));
+    ugi.addToken(hdfsToken);
+
+    // test fallback to hdfs token
+    Token<?> token = fs.selectDelegationToken(ugi);
+    assertNotNull(token);
+    assertEquals(hdfsToken, token);
+
+    // test hftp is favored over hdfs
+    Token<?> hftpToken = new Token<TokenIdentifier>(
+        new byte[0], new byte[0],
+        HftpFileSystem.TOKEN_KIND, new Text("127.0.0.1:"+port));
+    ugi.addToken(hftpToken);
+    token = fs.selectDelegationToken(ugi);
+    assertNotNull(token);
+    assertEquals(hftpToken, token);
+
+    // switch to using host-based tokens, no token should match
+    SecurityUtilTestHelper.setTokenServiceUseIp(false);
+    token = fs.selectDelegationToken(ugi);
+    assertNull(token);
+
+    // test fallback to hdfs token
+    hdfsToken = new Token<TokenIdentifier>(
+        new byte[0], new byte[0],
+        DelegationTokenIdentifier.HDFS_DELEGATION_KIND,
+        new Text("localhost:8020"));
+    ugi.addToken(hdfsToken);
+    token = fs.selectDelegationToken(ugi);
+    assertNotNull(token);
+    assertEquals(hdfsToken, token);
+
+    // test hftp is favored over hdfs
+    hftpToken = new Token<TokenIdentifier>(
+        new byte[0], new byte[0],
+        HftpFileSystem.TOKEN_KIND, new Text("localhost:"+port));
+    ugi.addToken(hftpToken);
+    token = fs.selectDelegationToken(ugi);
+    assertNotNull(token);
+    assertEquals(hftpToken, token);
+  }
+
+  static class MyHftpFileSystem extends HftpFileSystem {
+    @Override
+    public URI getCanonicalUri() {
+      return super.getCanonicalUri();
+    }
+    @Override
+    public int getDefaultPort() {
+      return super.getDefaultPort();
+    }
+    // don't automatically get a token
+    @Override
+    protected void initDelegationToken() throws IOException {}
+  }
+
+  static class MyHsftpFileSystem extends HsftpFileSystem {
+    @Override
+    public URI getCanonicalUri() {
+      return super.getCanonicalUri();
+    }
+    @Override
+    public int getDefaultPort() {
+      return super.getDefaultPort();
+    }
+    // don't automatically get a token
+    @Override
+    protected void initDelegationToken() throws IOException {}
+  }
+}