You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wh...@apache.org on 2015/04/24 02:33:50 UTC

[2/4] hadoop git commit: HDFS-8052. Move WebHdfsFileSystem into hadoop-hdfs-client. Contributed by Haohui Mai.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcf89ddc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
deleted file mode 100644
index 0056078..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtilClient.java
+++ /dev/null
@@ -1,484 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.web;
-
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
-import org.apache.hadoop.fs.XAttrCodec;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.FsPermissionExtension;
-import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
-import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.util.DataChecksum;
-import org.apache.hadoop.util.StringUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-import org.codehaus.jackson.map.ObjectReader;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-
-class JsonUtilClient {
-  static final DatanodeInfo[] EMPTY_DATANODE_INFO_ARRAY = {};
-
-  /** Convert a Json map to a RemoteException. */
-  static RemoteException toRemoteException(final Map<?, ?> json) {
-    final Map<?, ?> m = (Map<?, ?>)json.get(RemoteException.class.getSimpleName());
-    final String message = (String)m.get("message");
-    final String javaClassName = (String)m.get("javaClassName");
-    return new RemoteException(javaClassName, message);
-  }
-
-  /** Convert a Json map to a Token. */
-  static Token<? extends TokenIdentifier> toToken(
-      final Map<?, ?> m) throws IOException {
-    if (m == null) {
-      return null;
-    }
-
-    final Token<DelegationTokenIdentifier> token
-        = new Token<>();
-    token.decodeFromUrlString((String)m.get("urlString"));
-    return token;
-  }
-
-  /** Convert a Json map to a Token of BlockTokenIdentifier. */
-  @SuppressWarnings("unchecked")
-  static Token<BlockTokenIdentifier> toBlockToken(
-      final Map<?, ?> m) throws IOException {
-    return (Token<BlockTokenIdentifier>)toToken(m);
-  }
-
-  /** Convert a string to a FsPermission object. */
-  static FsPermission toFsPermission(
-      final String s, Boolean aclBit, Boolean encBit) {
-    FsPermission perm = new FsPermission(Short.parseShort(s, 8));
-    final boolean aBit = (aclBit != null) ? aclBit : false;
-    final boolean eBit = (encBit != null) ? encBit : false;
-    if (aBit || eBit) {
-      return new FsPermissionExtension(perm, aBit, eBit);
-    } else {
-      return perm;
-    }
-  }
-
-  /** Convert a Json map to a HdfsFileStatus object. */
-  static HdfsFileStatus toFileStatus(final Map<?, ?> json, boolean includesType) {
-    if (json == null) {
-      return null;
-    }
-
-    final Map<?, ?> m = includesType ?
-        (Map<?, ?>)json.get(FileStatus.class.getSimpleName()) : json;
-    final String localName = (String) m.get("pathSuffix");
-    final WebHdfsConstants.PathType type = WebHdfsConstants.PathType.valueOf((String) m.get("type"));
-    final byte[] symlink = type != WebHdfsConstants.PathType.SYMLINK? null
-        : DFSUtil.string2Bytes((String) m.get("symlink"));
-
-    final long len = ((Number) m.get("length")).longValue();
-    final String owner = (String) m.get("owner");
-    final String group = (String) m.get("group");
-    final FsPermission permission = toFsPermission((String) m.get("permission"),
-                                                   (Boolean) m.get("aclBit"),
-                                                   (Boolean) m.get("encBit"));
-    final long aTime = ((Number) m.get("accessTime")).longValue();
-    final long mTime = ((Number) m.get("modificationTime")).longValue();
-    final long blockSize = ((Number) m.get("blockSize")).longValue();
-    final short replication = ((Number) m.get("replication")).shortValue();
-    final long fileId = m.containsKey("fileId") ?
-        ((Number) m.get("fileId")).longValue() : HdfsConstantsClient.GRANDFATHER_INODE_ID;
-    final int childrenNum = getInt(m, "childrenNum", -1);
-    final byte storagePolicy = m.containsKey("storagePolicy") ?
-        (byte) ((Number) m.get("storagePolicy")).longValue() :
-        HdfsConstantsClient.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED;
-    return new HdfsFileStatus(len, type == WebHdfsConstants.PathType.DIRECTORY, replication,
-        blockSize, mTime, aTime, permission, owner, group,
-        symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null,
-        storagePolicy);
-  }
-
-  /** Convert a Json map to an ExtendedBlock object. */
-  static ExtendedBlock toExtendedBlock(final Map<?, ?> m) {
-    if (m == null) {
-      return null;
-    }
-
-    final String blockPoolId = (String)m.get("blockPoolId");
-    final long blockId = ((Number) m.get("blockId")).longValue();
-    final long numBytes = ((Number) m.get("numBytes")).longValue();
-    final long generationStamp =
-        ((Number) m.get("generationStamp")).longValue();
-    return new ExtendedBlock(blockPoolId, blockId, numBytes, generationStamp);
-  }
-
-  static int getInt(Map<?, ?> m, String key, final int defaultValue) {
-    Object value = m.get(key);
-    if (value == null) {
-      return defaultValue;
-    }
-    return ((Number) value).intValue();
-  }
-
-  static long getLong(Map<?, ?> m, String key, final long defaultValue) {
-    Object value = m.get(key);
-    if (value == null) {
-      return defaultValue;
-    }
-    return ((Number) value).longValue();
-  }
-
-  static String getString(
-      Map<?, ?> m, String key, final String defaultValue) {
-    Object value = m.get(key);
-    if (value == null) {
-      return defaultValue;
-    }
-    return (String) value;
-  }
-
-  static List<?> getList(Map<?, ?> m, String key) {
-    Object list = m.get(key);
-    if (list instanceof List<?>) {
-      return (List<?>) list;
-    } else {
-      return null;
-    }
-  }
-
-  /** Convert a Json map to an DatanodeInfo object. */
-  static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
-    throws IOException {
-    if (m == null) {
-      return null;
-    }
-
-    // ipAddr and xferPort are the critical fields for accessing data.
-    // If any one of the two is missing, an exception needs to be thrown.
-
-    // Handle the case of old servers (1.x, 0.23.x) sending 'name' instead
-    //  of ipAddr and xferPort.
-    String ipAddr = getString(m, "ipAddr", null);
-    int xferPort = getInt(m, "xferPort", -1);
-    if (ipAddr == null) {
-      String name = getString(m, "name", null);
-      if (name != null) {
-        int colonIdx = name.indexOf(':');
-        if (colonIdx > 0) {
-          ipAddr = name.substring(0, colonIdx);
-          xferPort = Integer.parseInt(name.substring(colonIdx +1));
-        } else {
-          throw new IOException(
-              "Invalid value in server response: name=[" + name + "]");
-        }
-      } else {
-        throw new IOException(
-            "Missing both 'ipAddr' and 'name' in server response.");
-      }
-      // ipAddr is non-null & non-empty string at this point.
-    }
-
-    // Check the validity of xferPort.
-    if (xferPort == -1) {
-      throw new IOException(
-          "Invalid or missing 'xferPort' in server response.");
-    }
-
-    // TODO: Fix storageID
-    return new DatanodeInfo(
-        ipAddr,
-        (String)m.get("hostName"),
-        (String)m.get("storageID"),
-        xferPort,
-        ((Number) m.get("infoPort")).intValue(),
-        getInt(m, "infoSecurePort", 0),
-        ((Number) m.get("ipcPort")).intValue(),
-
-        getLong(m, "capacity", 0l),
-        getLong(m, "dfsUsed", 0l),
-        getLong(m, "remaining", 0l),
-        getLong(m, "blockPoolUsed", 0l),
-        getLong(m, "cacheCapacity", 0l),
-        getLong(m, "cacheUsed", 0l),
-        getLong(m, "lastUpdate", 0l),
-        getLong(m, "lastUpdateMonotonic", 0l),
-        getInt(m, "xceiverCount", 0),
-        getString(m, "networkLocation", ""),
-        DatanodeInfo.AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
-  }
-
-  /** Convert an Object[] to a DatanodeInfo[]. */
-  static DatanodeInfo[] toDatanodeInfoArray(final List<?> objects)
-      throws IOException {
-    if (objects == null) {
-      return null;
-    } else if (objects.isEmpty()) {
-      return EMPTY_DATANODE_INFO_ARRAY;
-    } else {
-      final DatanodeInfo[] array = new DatanodeInfo[objects.size()];
-      int i = 0;
-      for (Object object : objects) {
-        array[i++] = toDatanodeInfo((Map<?, ?>) object);
-      }
-      return array;
-    }
-  }
-
-  /** Convert a Json map to LocatedBlock. */
-  static LocatedBlock toLocatedBlock(final Map<?, ?> m) throws IOException {
-    if (m == null) {
-      return null;
-    }
-
-    final ExtendedBlock b = toExtendedBlock((Map<?, ?>)m.get("block"));
-    final DatanodeInfo[] locations = toDatanodeInfoArray(
-        getList(m, "locations"));
-    final long startOffset = ((Number) m.get("startOffset")).longValue();
-    final boolean isCorrupt = (Boolean)m.get("isCorrupt");
-    final DatanodeInfo[] cachedLocations = toDatanodeInfoArray(
-        getList(m, "cachedLocations"));
-
-    final LocatedBlock locatedblock = new LocatedBlock(b, locations,
-        null, null, startOffset, isCorrupt, cachedLocations);
-    locatedblock.setBlockToken(toBlockToken((Map<?, ?>)m.get("blockToken")));
-    return locatedblock;
-  }
-
-  /** Convert an List of Object to a List of LocatedBlock. */
-  static List<LocatedBlock> toLocatedBlockList(
-      final List<?> objects) throws IOException {
-    if (objects == null) {
-      return null;
-    } else if (objects.isEmpty()) {
-      return Collections.emptyList();
-    } else {
-      final List<LocatedBlock> list = new ArrayList<>(objects.size());
-      for (Object object : objects) {
-        list.add(toLocatedBlock((Map<?, ?>) object));
-      }
-      return list;
-    }
-  }
-
-  /** Convert a Json map to a ContentSummary. */
-  static ContentSummary toContentSummary(final Map<?, ?> json) {
-    if (json == null) {
-      return null;
-    }
-
-    final Map<?, ?> m = (Map<?, ?>)json.get(ContentSummary.class.getSimpleName());
-    final long length = ((Number) m.get("length")).longValue();
-    final long fileCount = ((Number) m.get("fileCount")).longValue();
-    final long directoryCount = ((Number) m.get("directoryCount")).longValue();
-    final long quota = ((Number) m.get("quota")).longValue();
-    final long spaceConsumed = ((Number) m.get("spaceConsumed")).longValue();
-    final long spaceQuota = ((Number) m.get("spaceQuota")).longValue();
-
-    return new ContentSummary.Builder().length(length).fileCount(fileCount).
-        directoryCount(directoryCount).quota(quota).spaceConsumed(spaceConsumed).
-        spaceQuota(spaceQuota).build();
-  }
-
-  /** Convert a Json map to a MD5MD5CRC32FileChecksum. */
-  static MD5MD5CRC32FileChecksum toMD5MD5CRC32FileChecksum(
-      final Map<?, ?> json) throws IOException {
-    if (json == null) {
-      return null;
-    }
-
-    final Map<?, ?> m = (Map<?, ?>)json.get(FileChecksum.class.getSimpleName());
-    final String algorithm = (String)m.get("algorithm");
-    final int length = ((Number) m.get("length")).intValue();
-    final byte[] bytes = StringUtils.hexStringToByte((String) m.get("bytes"));
-
-    final DataInputStream in = new DataInputStream(new ByteArrayInputStream(bytes));
-    final DataChecksum.Type crcType =
-        MD5MD5CRC32FileChecksum.getCrcTypeFromAlgorithmName(algorithm);
-    final MD5MD5CRC32FileChecksum checksum;
-
-    // Recreate what DFSClient would have returned.
-    switch(crcType) {
-      case CRC32:
-        checksum = new MD5MD5CRC32GzipFileChecksum();
-        break;
-      case CRC32C:
-        checksum = new MD5MD5CRC32CastagnoliFileChecksum();
-        break;
-      default:
-        throw new IOException("Unknown algorithm: " + algorithm);
-    }
-    checksum.readFields(in);
-
-    //check algorithm name
-    if (!checksum.getAlgorithmName().equals(algorithm)) {
-      throw new IOException("Algorithm not matched. Expected " + algorithm
-          + ", Received " + checksum.getAlgorithmName());
-    }
-    //check length
-    if (length != checksum.getLength()) {
-      throw new IOException("Length not matched: length=" + length
-          + ", checksum.getLength()=" + checksum.getLength());
-    }
-
-    return checksum;
-  }
-
-  /** Convert a Json map to a AclStatus object. */
-  static AclStatus toAclStatus(final Map<?, ?> json) {
-    if (json == null) {
-      return null;
-    }
-
-    final Map<?, ?> m = (Map<?, ?>) json.get(AclStatus.class.getSimpleName());
-
-    AclStatus.Builder aclStatusBuilder = new AclStatus.Builder();
-    aclStatusBuilder.owner((String) m.get("owner"));
-    aclStatusBuilder.group((String) m.get("group"));
-    aclStatusBuilder.stickyBit((Boolean) m.get("stickyBit"));
-    String permString = (String) m.get("permission");
-    if (permString != null) {
-      final FsPermission permission = toFsPermission(permString,
-          (Boolean) m.get("aclBit"), (Boolean) m.get("encBit"));
-      aclStatusBuilder.setPermission(permission);
-    }
-    final List<?> entries = (List<?>) m.get("entries");
-
-    List<AclEntry> aclEntryList = new ArrayList<>();
-    for (Object entry : entries) {
-      AclEntry aclEntry = AclEntry.parseAclEntry((String) entry, true);
-      aclEntryList.add(aclEntry);
-    }
-    aclStatusBuilder.addEntries(aclEntryList);
-    return aclStatusBuilder.build();
-  }
-
-  static byte[] getXAttr(final Map<?, ?> json, final String name)
-      throws IOException {
-    if (json == null) {
-      return null;
-    }
-
-    Map<String, byte[]> xAttrs = toXAttrs(json);
-    if (xAttrs != null) {
-      return xAttrs.get(name);
-    }
-
-    return null;
-  }
-
-  static Map<String, byte[]> toXAttrs(final Map<?, ?> json)
-      throws IOException {
-    if (json == null) {
-      return null;
-    }
-    return toXAttrMap(getList(json, "XAttrs"));
-  }
-
-  static List<String> toXAttrNames(final Map<?, ?> json)
-      throws IOException {
-    if (json == null) {
-      return null;
-    }
-
-    final String namesInJson = (String) json.get("XAttrNames");
-    ObjectReader reader = new ObjectMapper().reader(List.class);
-    final List<Object> xattrs = reader.readValue(namesInJson);
-    final List<String> names =
-      Lists.newArrayListWithCapacity(json.keySet().size());
-
-    for (Object xattr : xattrs) {
-      names.add((String) xattr);
-    }
-    return names;
-  }
-
-  static Map<String, byte[]> toXAttrMap(final List<?> objects)
-      throws IOException {
-    if (objects == null) {
-      return null;
-    } else if (objects.isEmpty()) {
-      return Maps.newHashMap();
-    } else {
-      final Map<String, byte[]> xAttrs = Maps.newHashMap();
-      for (Object object : objects) {
-        Map<?, ?> m = (Map<?, ?>) object;
-        String name = (String) m.get("name");
-        String value = (String) m.get("value");
-        xAttrs.put(name, decodeXAttrValue(value));
-      }
-      return xAttrs;
-    }
-  }
-
-  static byte[] decodeXAttrValue(String value) throws IOException {
-    if (value != null) {
-      return XAttrCodec.decodeValue(value);
-    } else {
-      return new byte[0];
-    }
-  }
-
-  /** Convert a Json map to a Token of DelegationTokenIdentifier. */
-  @SuppressWarnings("unchecked")
-  static Token<DelegationTokenIdentifier> toDelegationToken(
-      final Map<?, ?> json) throws IOException {
-    final Map<?, ?> m = (Map<?, ?>)json.get(Token.class.getSimpleName());
-    return (Token<DelegationTokenIdentifier>) toToken(m);
-  }
-
-  /** Convert a Json map to LocatedBlock. */
-  static LocatedBlocks toLocatedBlocks(
-      final Map<?, ?> json) throws IOException {
-    if (json == null) {
-      return null;
-    }
-
-    final Map<?, ?> m = (Map<?, ?>)json.get(LocatedBlocks.class.getSimpleName());
-    final long fileLength = ((Number) m.get("fileLength")).longValue();
-    final boolean isUnderConstruction = (Boolean)m.get("isUnderConstruction");
-    final List<LocatedBlock> locatedBlocks = toLocatedBlockList(
-        getList(m, "locatedBlocks"));
-    final LocatedBlock lastLocatedBlock = toLocatedBlock(
-        (Map<?, ?>) m.get("lastLocatedBlock"));
-    final boolean isLastBlockComplete = (Boolean)m.get("isLastBlockComplete");
-    return new LocatedBlocks(fileLength, isUnderConstruction, locatedBlocks,
-        lastLocatedBlock, isLastBlockComplete, null);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcf89ddc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
deleted file mode 100644
index b8ea951..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/KerberosUgiAuthenticator.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.web;
-
-import java.io.IOException;
-
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.Authenticator;
-import org.apache.hadoop.security.authentication.client.KerberosAuthenticator;
-import org.apache.hadoop.security.authentication.client.PseudoAuthenticator;
-
-/**
- * Use UserGroupInformation as a fallback authenticator
- * if the server does not use Kerberos SPNEGO HTTP authentication.
- */
-public class KerberosUgiAuthenticator extends KerberosAuthenticator {
-  @Override
-  protected Authenticator getFallBackAuthenticator() {
-    return new PseudoAuthenticator() {
-      @Override
-      protected String getUserName() {
-        try {
-          return UserGroupInformation.getLoginUser().getUserName();
-        } catch (IOException e) {
-          throw new SecurityException("Failed to obtain current username", e);
-        }
-      }
-    };
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcf89ddc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
deleted file mode 100644
index 73c10d6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/SWebHdfsFileSystem.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.web;
-
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.io.Text;
-
-public class SWebHdfsFileSystem extends WebHdfsFileSystem {
-
-  @Override
-  public String getScheme() {
-    return WebHdfsConstants.SWEBHDFS_SCHEME;
-  }
-
-  @Override
-  protected String getTransportScheme() {
-    return "https";
-  }
-
-  @Override
-  protected Text getTokenKind() {
-    return WebHdfsConstants.SWEBHDFS_TOKEN_KIND;
-  }
-
-  @Override
-  protected int getDefaultPort() {
-    return DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_DEFAULT;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcf89ddc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
deleted file mode 100644
index bc3eb4b..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/TokenAspect.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hdfs.web;
-
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.URI;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.DelegationTokenRenewer;
-import org.apache.hadoop.fs.DelegationTokenRenewer.Renewable;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hdfs.HAUtilClient;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenRenewer;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * This class implements the aspects that relate to delegation tokens for all
- * HTTP-based file system.
- */
-final class TokenAspect<T extends FileSystem & Renewable> {
-  @InterfaceAudience.Private
-  public static class TokenManager extends TokenRenewer {
-
-    @Override
-    public void cancel(Token<?> token, Configuration conf) throws IOException {
-      getInstance(token, conf).cancelDelegationToken(token);
-    }
-
-    @Override
-    public boolean handleKind(Text kind) {
-      return kind.equals(WebHdfsConstants.WEBHDFS_TOKEN_KIND)
-          || kind.equals(WebHdfsConstants.SWEBHDFS_TOKEN_KIND);
-    }
-
-    @Override
-    public boolean isManaged(Token<?> token) throws IOException {
-      return true;
-    }
-
-    @Override
-    public long renew(Token<?> token, Configuration conf) throws IOException {
-      return getInstance(token, conf).renewDelegationToken(token);
-    }
-
-    private TokenManagementDelegator getInstance(Token<?> token,
-                                                 Configuration conf)
-            throws IOException {
-      final URI uri;
-      final String scheme = getSchemeByKind(token.getKind());
-      if (HAUtilClient.isTokenForLogicalUri(token)) {
-        uri = HAUtilClient.getServiceUriFromToken(scheme, token);
-      } else {
-        final InetSocketAddress address = SecurityUtil.getTokenServiceAddr
-                (token);
-        uri = URI.create(scheme + "://" + NetUtils.getHostPortString(address));
-      }
-      return (TokenManagementDelegator) FileSystem.get(uri, conf);
-    }
-
-    private static String getSchemeByKind(Text kind) {
-      if (kind.equals(WebHdfsConstants.WEBHDFS_TOKEN_KIND)) {
-        return WebHdfsConstants.WEBHDFS_SCHEME;
-      } else if (kind.equals(WebHdfsConstants.SWEBHDFS_TOKEN_KIND)) {
-        return WebHdfsConstants.SWEBHDFS_SCHEME;
-      } else {
-        throw new IllegalArgumentException("Unsupported scheme");
-      }
-    }
-  }
-
-  private static class DTSelecorByKind extends
-      AbstractDelegationTokenSelector<DelegationTokenIdentifier> {
-    public DTSelecorByKind(final Text kind) {
-      super(kind);
-    }
-  }
-
-  /**
-   * Callbacks for token management
-   */
-  interface TokenManagementDelegator {
-    void cancelDelegationToken(final Token<?> token) throws IOException;
-    long renewDelegationToken(final Token<?> token) throws IOException;
-  }
-
-  private DelegationTokenRenewer.RenewAction<?> action;
-  private DelegationTokenRenewer dtRenewer = null;
-  private final DTSelecorByKind dtSelector;
-  private final T fs;
-  private boolean hasInitedToken;
-  private final Log LOG;
-  private final Text serviceName;
-
-  TokenAspect(T fs, final Text serviceName, final Text kind) {
-    this.LOG = LogFactory.getLog(fs.getClass());
-    this.fs = fs;
-    this.dtSelector = new DTSelecorByKind(kind);
-    this.serviceName = serviceName;
-  }
-
-  synchronized void ensureTokenInitialized() throws IOException {
-    // we haven't inited yet, or we used to have a token but it expired
-    if (!hasInitedToken || (action != null && !action.isValid())) {
-      //since we don't already have a token, go get one
-      Token<?> token = fs.getDelegationToken(null);
-      // security might be disabled
-      if (token != null) {
-        fs.setDelegationToken(token);
-        addRenewAction(fs);
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Created new DT for " + token.getService());
-        }
-      }
-      hasInitedToken = true;
-    }
-  }
-
-  public synchronized void reset() {
-    hasInitedToken = false;
-  }
-
-  synchronized void initDelegationToken(UserGroupInformation ugi) {
-    Token<?> token = selectDelegationToken(ugi);
-    if (token != null) {
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Found existing DT for " + token.getService());
-      }
-      fs.setDelegationToken(token);
-      hasInitedToken = true;
-    }
-  }
-
-  synchronized void removeRenewAction() throws IOException {
-    if (dtRenewer != null) {
-      dtRenewer.removeRenewAction(fs);
-    }
-  }
-
-  @VisibleForTesting
-  Token<DelegationTokenIdentifier> selectDelegationToken(
-      UserGroupInformation ugi) {
-    return dtSelector.selectToken(serviceName, ugi.getTokens());
-  }
-
-  private synchronized void addRenewAction(final T webhdfs) {
-    if (dtRenewer == null) {
-      dtRenewer = DelegationTokenRenewer.getInstance();
-    }
-
-    action = dtRenewer.addRenewAction(webhdfs);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcf89ddc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
deleted file mode 100644
index 8a743b6..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/URLConnectionFactory.java
+++ /dev/null
@@ -1,187 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.web;
-
-import java.io.IOException;
-import java.net.HttpURLConnection;
-import java.net.URL;
-import java.net.URLConnection;
-import java.security.GeneralSecurityException;
-
-import javax.net.ssl.HostnameVerifier;
-import javax.net.ssl.HttpsURLConnection;
-import javax.net.ssl.SSLSocketFactory;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
-import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
-import org.apache.hadoop.security.ssl.SSLFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-
-/**
- * Utilities for handling URLs
- */
-@InterfaceAudience.LimitedPrivate({ "HDFS" })
-@InterfaceStability.Unstable
-public class URLConnectionFactory {
-  private static final Log LOG = LogFactory.getLog(URLConnectionFactory.class);
-
-  /**
-   * Timeout for socket connects and reads
-   */
-  public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
-  private final ConnectionConfigurator connConfigurator;
-
-  private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR = new ConnectionConfigurator() {
-    @Override
-    public HttpURLConnection configure(HttpURLConnection conn)
-        throws IOException {
-      URLConnectionFactory.setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
-      return conn;
-    }
-  };
-
-  /**
-   * The URLConnectionFactory that sets the default timeout and it only trusts
-   * Java's SSL certificates.
-   */
-  public static final URLConnectionFactory DEFAULT_SYSTEM_CONNECTION_FACTORY = new URLConnectionFactory(
-      DEFAULT_TIMEOUT_CONN_CONFIGURATOR);
-
-  /**
-   * Construct a new URLConnectionFactory based on the configuration. It will
-   * try to load SSL certificates when it is specified.
-   */
-  public static URLConnectionFactory newDefaultURLConnectionFactory(Configuration conf) {
-    ConnectionConfigurator conn = null;
-    try {
-      conn = newSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
-    } catch (Exception e) {
-      LOG.debug(
-          "Cannot load customized ssl related configuration. Fallback to system-generic settings.",
-          e);
-      conn = DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
-    }
-    return new URLConnectionFactory(conn);
-  }
-
-  @VisibleForTesting
-  URLConnectionFactory(ConnectionConfigurator connConfigurator) {
-    this.connConfigurator = connConfigurator;
-  }
-
-  /**
-   * Create a new ConnectionConfigurator for SSL connections
-   */
-  private static ConnectionConfigurator newSslConnConfigurator(final int timeout,
-      Configuration conf) throws IOException, GeneralSecurityException {
-    final SSLFactory factory;
-    final SSLSocketFactory sf;
-    final HostnameVerifier hv;
-
-    factory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
-    factory.init();
-    sf = factory.createSSLSocketFactory();
-    hv = factory.getHostnameVerifier();
-
-    return new ConnectionConfigurator() {
-      @Override
-      public HttpURLConnection configure(HttpURLConnection conn)
-          throws IOException {
-        if (conn instanceof HttpsURLConnection) {
-          HttpsURLConnection c = (HttpsURLConnection) conn;
-          c.setSSLSocketFactory(sf);
-          c.setHostnameVerifier(hv);
-        }
-        URLConnectionFactory.setTimeouts(conn, timeout);
-        return conn;
-      }
-    };
-  }
-
-  /**
-   * Opens a url with read and connect timeouts
-   *
-   * @param url
-   *          to open
-   * @return URLConnection
-   * @throws IOException
-   */
-  public URLConnection openConnection(URL url) throws IOException {
-    try {
-      return openConnection(url, false);
-    } catch (AuthenticationException e) {
-      // Unreachable
-      return null;
-    }
-  }
-
-  /**
-   * Opens a url with read and connect timeouts
-   *
-   * @param url
-   *          URL to open
-   * @param isSpnego
-   *          whether the url should be authenticated via SPNEGO
-   * @return URLConnection
-   * @throws IOException
-   * @throws AuthenticationException
-   */
-  public URLConnection openConnection(URL url, boolean isSpnego)
-      throws IOException, AuthenticationException {
-    if (isSpnego) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("open AuthenticatedURL connection" + url);
-      }
-      UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
-      final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
-      return new AuthenticatedURL(new KerberosUgiAuthenticator(),
-          connConfigurator).openConnection(url, authToken);
-    } else {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("open URL connection");
-      }
-      URLConnection connection = url.openConnection();
-      if (connection instanceof HttpURLConnection) {
-        connConfigurator.configure((HttpURLConnection) connection);
-      }
-      return connection;
-    }
-  }
-
-  /**
-   * Sets timeout parameters on the given URLConnection.
-   * 
-   * @param connection
-   *          URLConnection to set
-   * @param socketTimeout
-   *          the connection and read timeout of the connection.
-   */
-  private static void setTimeouts(URLConnection connection, int socketTimeout) {
-    connection.setConnectTimeout(socketTimeout);
-    connection.setReadTimeout(socketTimeout);
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/bcf89ddc/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
deleted file mode 100644
index acdd8e1..0000000
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
+++ /dev/null
@@ -1,1463 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdfs.web;
-
-import java.io.BufferedOutputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.io.InputStream;
-import java.net.HttpURLConnection;
-import java.net.InetSocketAddress;
-import java.net.MalformedURLException;
-import java.net.URI;
-import java.net.URL;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.StringTokenizer;
-
-import javax.ws.rs.core.MediaType;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.BlockLocation;
-import org.apache.hadoop.fs.CommonConfigurationKeys;
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.DelegationTokenRenewer;
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
-import org.apache.hadoop.fs.Options;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.XAttrCodec;
-import org.apache.hadoop.fs.XAttrSetFlag;
-import org.apache.hadoop.fs.permission.AclEntry;
-import org.apache.hadoop.fs.permission.AclStatus;
-import org.apache.hadoop.fs.permission.FsAction;
-import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.DFSConfigKeys;
-import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.DFSUtilClient;
-import org.apache.hadoop.hdfs.HAUtilClient;
-import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
-import org.apache.hadoop.hdfs.server.namenode.SafeModeException;
-import org.apache.hadoop.hdfs.web.resources.*;
-import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.io.retry.RetryPolicies;
-import org.apache.hadoop.io.retry.RetryPolicy;
-import org.apache.hadoop.io.retry.RetryUtils;
-import org.apache.hadoop.ipc.RemoteException;
-import org.apache.hadoop.net.NetUtils;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.SecurityUtil;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.token.SecretManager.InvalidToken;
-import org.apache.hadoop.security.token.Token;
-import org.apache.hadoop.security.token.TokenIdentifier;
-import org.apache.hadoop.security.token.TokenSelector;
-import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
-import org.apache.hadoop.util.Progressable;
-import org.apache.hadoop.util.StringUtils;
-import org.codehaus.jackson.map.ObjectMapper;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/** A FileSystem for HDFS over the web. */
-public class WebHdfsFileSystem extends FileSystem
-    implements DelegationTokenRenewer.Renewable, TokenAspect.TokenManagementDelegator {
-  public static final Log LOG = LogFactory.getLog(WebHdfsFileSystem.class);
-  /** WebHdfs version. */
-  public static final int VERSION = 1;
-  /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
-  public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME + "/v" + VERSION;
-
-  /** Default connection factory may be overridden in tests to use smaller timeout values */
-  protected URLConnectionFactory connectionFactory;
-
-  @VisibleForTesting
-  public static final String CANT_FALLBACK_TO_INSECURE_MSG =
-      "The client is configured to only allow connecting to secure cluster";
-
-  private boolean canRefreshDelegationToken;
-
-  private UserGroupInformation ugi;
-  private URI uri;
-  private Token<?> delegationToken;
-  protected Text tokenServiceName;
-  private RetryPolicy retryPolicy = null;
-  private Path workingDir;
-  private InetSocketAddress nnAddrs[];
-  private int currentNNAddrIndex;
-  private boolean disallowFallbackToInsecureCluster;
-
-  /**
-   * Return the protocol scheme for the FileSystem.
-   * <p/>
-   *
-   * @return <code>webhdfs</code>
-   */
-  @Override
-  public String getScheme() {
-    return WebHdfsConstants.WEBHDFS_SCHEME;
-  }
-
-  /**
-   * return the underlying transport protocol (http / https).
-   */
-  protected String getTransportScheme() {
-    return "http";
-  }
-
-  protected Text getTokenKind() {
-    return WebHdfsConstants.WEBHDFS_TOKEN_KIND;
-  }
-
-  @Override
-  public synchronized void initialize(URI uri, Configuration conf
-      ) throws IOException {
-    super.initialize(uri, conf);
-    setConf(conf);
-    /** set user pattern based on configuration file */
-    UserParam.setUserPattern(conf.get(
-        DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_KEY,
-        DFSConfigKeys.DFS_WEBHDFS_USER_PATTERN_DEFAULT));
-
-    connectionFactory = URLConnectionFactory
-        .newDefaultURLConnectionFactory(conf);
-
-
-    ugi = UserGroupInformation.getCurrentUser();
-    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
-    this.nnAddrs = resolveNNAddr();
-
-    boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri);
-    boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri);
-    // In non-HA or non-logical URI case, the code needs to call
-    // getCanonicalUri() in order to handle the case where no port is
-    // specified in the URI
-    this.tokenServiceName = isLogicalUri ?
-        HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme())
-        : SecurityUtil.buildTokenService(getCanonicalUri());
-
-    if (!isHA) {
-      this.retryPolicy =
-          RetryUtils.getDefaultRetryPolicy(
-              conf,
-              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_KEY,
-              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_ENABLED_DEFAULT,
-              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_KEY,
-              HdfsClientConfigKeys.HttpClient.RETRY_POLICY_SPEC_DEFAULT,
-              SafeModeException.class);
-    } else {
-
-      int maxFailoverAttempts = conf.getInt(
-          HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_KEY,
-          HdfsClientConfigKeys.HttpClient.FAILOVER_MAX_ATTEMPTS_DEFAULT);
-      int maxRetryAttempts = conf.getInt(
-          HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_KEY,
-          HdfsClientConfigKeys.HttpClient.RETRY_MAX_ATTEMPTS_DEFAULT);
-      int failoverSleepBaseMillis = conf.getInt(
-          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_KEY,
-          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_BASE_DEFAULT);
-      int failoverSleepMaxMillis = conf.getInt(
-          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_KEY,
-          HdfsClientConfigKeys.HttpClient.FAILOVER_SLEEPTIME_MAX_DEFAULT);
-
-      this.retryPolicy = RetryPolicies
-          .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
-              maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,
-              failoverSleepMaxMillis);
-    }
-
-    this.workingDir = getHomeDirectory();
-    this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
-    this.disallowFallbackToInsecureCluster = !conf.getBoolean(
-        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
-        CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
-    this.delegationToken = null;
-  }
-
-  @Override
-  public URI getCanonicalUri() {
-    return super.getCanonicalUri();
-  }
-
-  TokenSelector<DelegationTokenIdentifier> tokenSelector =
-      new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};
-
-  // the first getAuthParams() for a non-token op will either get the
-  // internal token from the ugi or lazy fetch one
-  protected synchronized Token<?> getDelegationToken() throws IOException {
-    if (canRefreshDelegationToken && delegationToken == null) {
-      Token<?> token = tokenSelector.selectToken(
-          new Text(getCanonicalServiceName()), ugi.getTokens());
-      // ugi tokens are usually indicative of a task which can't
-      // refetch tokens.  even if ugi has credentials, don't attempt
-      // to get another token to match hdfs/rpc behavior
-      if (token != null) {
-        if(LOG.isDebugEnabled()) {
-          LOG.debug("Using UGI token: " + token);
-        }
-        canRefreshDelegationToken = false; 
-      } else {
-        token = getDelegationToken(null);
-        if (token != null) {
-          if(LOG.isDebugEnabled()) {
-            LOG.debug("Fetched new token: " + token);
-          }
-        } else { // security is disabled
-          canRefreshDelegationToken = false;
-        }
-      }
-      setDelegationToken(token);
-    }
-    return delegationToken;
-  }
-
-  @VisibleForTesting
-  synchronized boolean replaceExpiredDelegationToken() throws IOException {
-    boolean replaced = false;
-    if (canRefreshDelegationToken) {
-      Token<?> token = getDelegationToken(null);
-      if(LOG.isDebugEnabled()) {
-        LOG.debug("Replaced expired token: " + token);
-      }
-      setDelegationToken(token);
-      replaced = (token != null);
-    }
-    return replaced;
-  }
-
-  @Override
-  protected int getDefaultPort() {
-    return DFSConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;
-  }
-
-  @Override
-  public URI getUri() {
-    return this.uri;
-  }
-  
-  @Override
-  protected URI canonicalizeUri(URI uri) {
-    return NetUtils.getCanonicalUri(uri, getDefaultPort());
-  }
-
-  /** @return the home directory. */
-  public static String getHomeDirectoryString(final UserGroupInformation ugi) {
-    return "/user/" + ugi.getShortUserName();
-  }
-
-  @Override
-  public Path getHomeDirectory() {
-    return makeQualified(new Path(getHomeDirectoryString(ugi)));
-  }
-
-  @Override
-  public synchronized Path getWorkingDirectory() {
-    return workingDir;
-  }
-
-  @Override
-  public synchronized void setWorkingDirectory(final Path dir) {
-    String result = makeAbsolute(dir).toUri().getPath();
-    if (!DFSUtil.isValidName(result)) {
-      throw new IllegalArgumentException("Invalid DFS directory name " + 
-                                         result);
-    }
-    workingDir = makeAbsolute(dir);
-  }
-
-  private Path makeAbsolute(Path f) {
-    return f.isAbsolute()? f: new Path(workingDir, f);
-  }
-
-  static Map<?, ?> jsonParse(final HttpURLConnection c, final boolean useErrorStream
-      ) throws IOException {
-    if (c.getContentLength() == 0) {
-      return null;
-    }
-    final InputStream in = useErrorStream? c.getErrorStream(): c.getInputStream();
-    if (in == null) {
-      throw new IOException("The " + (useErrorStream? "error": "input") + " stream is null.");
-    }
-    try {
-      final String contentType = c.getContentType();
-      if (contentType != null) {
-        final MediaType parsed = MediaType.valueOf(contentType);
-        if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
-          throw new IOException("Content-Type \"" + contentType
-              + "\" is incompatible with \"" + MediaType.APPLICATION_JSON
-              + "\" (parsed=\"" + parsed + "\")");
-        }
-      }
-      ObjectMapper mapper = new ObjectMapper();
-      return mapper.reader(Map.class).readValue(in);
-    } finally {
-      in.close();
-    }
-  }
-
-  private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
-      final HttpURLConnection conn, boolean unwrapException) throws IOException {
-    final int code = conn.getResponseCode();
-    // server is demanding an authentication we don't support
-    if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
-      // match hdfs/rpc exception
-      throw new AccessControlException(conn.getResponseMessage());
-    }
-    if (code != op.getExpectedHttpResponseCode()) {
-      final Map<?, ?> m;
-      try {
-        m = jsonParse(conn, true);
-      } catch(Exception e) {
-        throw new IOException("Unexpected HTTP response: code=" + code + " != "
-            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
-            + ", message=" + conn.getResponseMessage(), e);
-      }
-
-      if (m == null) {
-        throw new IOException("Unexpected HTTP response: code=" + code + " != "
-            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
-            + ", message=" + conn.getResponseMessage());
-      } else if (m.get(RemoteException.class.getSimpleName()) == null) {
-        return m;
-      }
-
-      IOException re = JsonUtilClient.toRemoteException(m);
-      // extract UGI-related exceptions and unwrap InvalidToken
-      // the NN mangles these exceptions but the DN does not and may need
-      // to re-fetch a token if either report the token is expired
-      if (re.getMessage() != null && re.getMessage().startsWith(
-          SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER)) {
-        String[] parts = re.getMessage().split(":\\s+", 3);
-        re = new RemoteException(parts[1], parts[2]);
-        re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
-      }
-      throw unwrapException? toIOException(re): re;
-    }
-    return null;
-  }
-
-  /**
-   * Covert an exception to an IOException.
-   * 
-   * For a non-IOException, wrap it with IOException.
-   * For a RemoteException, unwrap it.
-   * For an IOException which is not a RemoteException, return it. 
-   */
-  private static IOException toIOException(Exception e) {
-    if (!(e instanceof IOException)) {
-      return new IOException(e);
-    }
-
-    final IOException ioe = (IOException)e;
-    if (!(ioe instanceof RemoteException)) {
-      return ioe;
-    }
-
-    return ((RemoteException)ioe).unwrapRemoteException();
-  }
-
-  private synchronized InetSocketAddress getCurrentNNAddr() {
-    return nnAddrs[currentNNAddrIndex];
-  }
-
-  /**
-   * Reset the appropriate state to gracefully fail over to another name node
-   */
-  private synchronized void resetStateToFailOver() {
-    currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
-  }
-
-  /**
-   * Return a URL pointing to given path on the namenode.
-   *
-   * @param path to obtain the URL for
-   * @param query string to append to the path
-   * @return namenode URL referring to the given path
-   * @throws IOException on error constructing the URL
-   */
-  private URL getNamenodeURL(String path, String query) throws IOException {
-    InetSocketAddress nnAddr = getCurrentNNAddr();
-    final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
-          nnAddr.getPort(), path + '?' + query);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("url=" + url);
-    }
-    return url;
-  }
-  
-  Param<?,?>[] getAuthParameters(final HttpOpParam.Op op) throws IOException {
-    List<Param<?,?>> authParams = Lists.newArrayList();    
-    // Skip adding delegation token for token operations because these
-    // operations require authentication.
-    Token<?> token = null;
-    if (!op.getRequireAuth()) {
-      token = getDelegationToken();
-    }
-    if (token != null) {
-      authParams.add(new DelegationParam(token.encodeToUrlString()));
-    } else {
-      UserGroupInformation userUgi = ugi;
-      UserGroupInformation realUgi = userUgi.getRealUser();
-      if (realUgi != null) { // proxy user
-        authParams.add(new DoAsParam(userUgi.getShortUserName()));
-        userUgi = realUgi;
-      }
-      authParams.add(new UserParam(userUgi.getShortUserName()));
-    }
-    return authParams.toArray(new Param<?,?>[0]);
-  }
-
-  URL toUrl(final HttpOpParam.Op op, final Path fspath,
-      final Param<?,?>... parameters) throws IOException {
-    //initialize URI path and query
-    final String path = PATH_PREFIX
-        + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath());
-    final String query = op.toQueryString()
-        + Param.toSortedString("&", getAuthParameters(op))
-        + Param.toSortedString("&", parameters);
-    final URL url = getNamenodeURL(path, query);
-    if (LOG.isTraceEnabled()) {
-      LOG.trace("url=" + url);
-    }
-    return url;
-  }
-
-  /**
-   * This class is for initialing a HTTP connection, connecting to server,
-   * obtaining a response, and also handling retry on failures.
-   */
-  abstract class AbstractRunner<T> {
-    abstract protected URL getUrl() throws IOException;
-
-    protected final HttpOpParam.Op op;
-    private final boolean redirected;
-    protected ExcludeDatanodesParam excludeDatanodes = new ExcludeDatanodesParam("");
-
-    private boolean checkRetry;
-
-    protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
-      this.op = op;
-      this.redirected = redirected;
-    }
-
-    T run() throws IOException {
-      UserGroupInformation connectUgi = ugi.getRealUser();
-      if (connectUgi == null) {
-        connectUgi = ugi;
-      }
-      if (op.getRequireAuth()) {
-        connectUgi.checkTGTAndReloginFromKeytab();
-      }
-      try {
-        // the entire lifecycle of the connection must be run inside the
-        // doAs to ensure authentication is performed correctly
-        return connectUgi.doAs(
-            new PrivilegedExceptionAction<T>() {
-              @Override
-              public T run() throws IOException {
-                return runWithRetry();
-              }
-            });
-      } catch (InterruptedException e) {
-        throw new IOException(e);
-      }
-    }
-
-    /**
-     * Two-step requests redirected to a DN
-     * 
-     * Create/Append:
-     * Step 1) Submit a Http request with neither auto-redirect nor data. 
-     * Step 2) Submit another Http request with the URL from the Location header with data.
-     * 
-     * The reason of having two-step create/append is for preventing clients to
-     * send out the data before the redirect. This issue is addressed by the
-     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
-     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
-     * and Java 6 http client), which do not correctly implement "Expect:
-     * 100-continue". The two-step create/append is a temporary workaround for
-     * the software library bugs.
-     * 
-     * Open/Checksum
-     * Also implements two-step connects for other operations redirected to
-     * a DN such as open and checksum
-     */
-    private HttpURLConnection connect(URL url) throws IOException {
-      //redirect hostname and port
-      String redirectHost = null;
-
-      
-      // resolve redirects for a DN operation unless already resolved
-      if (op.getRedirect() && !redirected) {
-        final HttpOpParam.Op redirectOp =
-            HttpOpParam.TemporaryRedirectOp.valueOf(op);
-        final HttpURLConnection conn = connect(redirectOp, url);
-        // application level proxy like httpfs might not issue a redirect
-        if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
-          return conn;
-        }
-        try {
-          validateResponse(redirectOp, conn, false);
-          url = new URL(conn.getHeaderField("Location"));
-          redirectHost = url.getHost() + ":" + url.getPort();
-        } finally {
-          conn.disconnect();
-        }
-      }
-      try {
-        return connect(op, url);
-      } catch (IOException ioe) {
-        if (redirectHost != null) {
-          if (excludeDatanodes.getValue() != null) {
-            excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
-                + excludeDatanodes.getValue());
-          } else {
-            excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
-          }
-        }
-        throw ioe;
-      }      
-    }
-
-    private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
-        throws IOException {
-      final HttpURLConnection conn =
-          (HttpURLConnection)connectionFactory.openConnection(url);
-      final boolean doOutput = op.getDoOutput();
-      conn.setRequestMethod(op.getType().toString());
-      conn.setInstanceFollowRedirects(false);
-      switch (op.getType()) {
-        // if not sending a message body for a POST or PUT operation, need
-        // to ensure the server/proxy knows this 
-        case POST:
-        case PUT: {
-          conn.setDoOutput(true);
-          if (!doOutput) {
-            // explicitly setting content-length to 0 won't do spnego!!
-            // opening and closing the stream will send "Content-Length: 0"
-            conn.getOutputStream().close();
-          } else {
-            conn.setRequestProperty("Content-Type",
-                MediaType.APPLICATION_OCTET_STREAM);
-            conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
-          }
-          break;
-        }
-        default: {
-          conn.setDoOutput(doOutput);
-          break;
-        }
-      }
-      conn.connect();
-      return conn;
-    }
-
-    private T runWithRetry() throws IOException {
-      /**
-       * Do the real work.
-       *
-       * There are three cases that the code inside the loop can throw an
-       * IOException:
-       *
-       * <ul>
-       * <li>The connection has failed (e.g., ConnectException,
-       * @see FailoverOnNetworkExceptionRetry for more details)</li>
-       * <li>The namenode enters the standby state (i.e., StandbyException).</li>
-       * <li>The server returns errors for the command (i.e., RemoteException)</li>
-       * </ul>
-       *
-       * The call to shouldRetry() will conduct the retry policy. The policy
-       * examines the exception and swallows it if it decides to rerun the work.
-       */
-      for(int retry = 0; ; retry++) {
-        checkRetry = !redirected;
-        final URL url = getUrl();
-        try {
-          final HttpURLConnection conn = connect(url);
-          // output streams will validate on close
-          if (!op.getDoOutput()) {
-            validateResponse(op, conn, false);
-          }
-          return getResponse(conn);
-        } catch (AccessControlException ace) {
-          // no retries for auth failures
-          throw ace;
-        } catch (InvalidToken it) {
-          // try to replace the expired token with a new one.  the attempt
-          // to acquire a new token must be outside this operation's retry
-          // so if it fails after its own retries, this operation fails too.
-          if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
-            throw it;
-          }
-        } catch (IOException ioe) {
-          shouldRetry(ioe, retry);
-        }
-      }
-    }
-
-    private void shouldRetry(final IOException ioe, final int retry
-        ) throws IOException {
-      InetSocketAddress nnAddr = getCurrentNNAddr();
-      if (checkRetry) {
-        try {
-          final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
-              ioe, retry, 0, true);
-
-          boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
-          boolean isFailoverAndRetry =
-              a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
-
-          if (isRetry || isFailoverAndRetry) {
-            LOG.info("Retrying connect to namenode: " + nnAddr
-                + ". Already tried " + retry + " time(s); retry policy is "
-                + retryPolicy + ", delay " + a.delayMillis + "ms.");
-
-            if (isFailoverAndRetry) {
-              resetStateToFailOver();
-            }
-
-            Thread.sleep(a.delayMillis);
-            return;
-          }
-        } catch(Exception e) {
-          LOG.warn("Original exception is ", ioe);
-          throw toIOException(e);
-        }
-      }
-      throw toIOException(ioe);
-    }
-
-    abstract T getResponse(HttpURLConnection conn) throws IOException;
-  }
-
-  /**
-   * Abstract base class to handle path-based operations with params
-   */
-  abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
-    private final Path fspath;
-    private final Param<?,?>[] parameters;
-    
-    AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
-        Param<?,?>... parameters) {
-      super(op, false);
-      this.fspath = fspath;
-      this.parameters = parameters;
-    }
-    
-    AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
-        final Path fspath) {
-      super(op, false);
-      this.fspath = fspath;
-      this.parameters = parameters;
-    }
-    
-    @Override
-    protected URL getUrl() throws IOException {
-      if (excludeDatanodes.getValue() != null) {
-        Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
-        System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
-        tmpParam[parameters.length] = excludeDatanodes;
-        return toUrl(op, fspath, tmpParam);
-      } else {
-        return toUrl(op, fspath, parameters);
-      }
-    }
-  }
-
-  /**
-   * Default path-based implementation expects no json response
-   */
-  class FsPathRunner extends AbstractFsPathRunner<Void> {
-    FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
-      super(op, fspath, parameters);
-    }
-    
-    @Override
-    Void getResponse(HttpURLConnection conn) throws IOException {
-      return null;
-    }
-  }
-
-  /**
-   * Handle path-based operations with a json response
-   */
-  abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
-    FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
-        Param<?,?>... parameters) {
-      super(op, fspath, parameters);
-    }
-    
-    FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
-        final Path fspath) {
-      super(op, parameters, fspath);
-    }
-    
-    @Override
-    final T getResponse(HttpURLConnection conn) throws IOException {
-      try {
-        final Map<?,?> json = jsonParse(conn, false);
-        if (json == null) {
-          // match exception class thrown by parser
-          throw new IllegalStateException("Missing response");
-        }
-        return decodeResponse(json);
-      } catch (IOException ioe) {
-        throw ioe;
-      } catch (Exception e) { // catch json parser errors
-        final IOException ioe =
-            new IOException("Response decoding failure: "+e.toString(), e);
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(ioe);
-        }
-        throw ioe;
-      } finally {
-        conn.disconnect();
-      }
-    }
-    
-    abstract T decodeResponse(Map<?,?> json) throws IOException;
-  }
-
-  /**
-   * Handle path-based operations with json boolean response
-   */
-  class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
-    FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
-      super(op, fspath, parameters);
-    }
-    
-    @Override
-    Boolean decodeResponse(Map<?,?> json) throws IOException {
-      return (Boolean)json.get("boolean");
-    }
-  }
-
-  /**
-   * Handle create/append output streams
-   */
-  class FsPathOutputStreamRunner extends AbstractFsPathRunner<FSDataOutputStream> {
-    private final int bufferSize;
-    
-    FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
-        Param<?,?>... parameters) {
-      super(op, fspath, parameters);
-      this.bufferSize = bufferSize;
-    }
-    
-    @Override
-    FSDataOutputStream getResponse(final HttpURLConnection conn)
-        throws IOException {
-      return new FSDataOutputStream(new BufferedOutputStream(
-          conn.getOutputStream(), bufferSize), statistics) {
-        @Override
-        public void close() throws IOException {
-          try {
-            super.close();
-          } finally {
-            try {
-              validateResponse(op, conn, true);
-            } finally {
-              conn.disconnect();
-            }
-          }
-        }
-      };
-    }
-  }
-
-  class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
-    FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
-      super(op, fspath, parameters);
-    }
-    @Override
-    HttpURLConnection getResponse(final HttpURLConnection conn)
-        throws IOException {
-      return conn;
-    }
-  }
-  
-  /**
-   * Used by open() which tracks the resolved url itself
-   */
-  final class URLRunner extends AbstractRunner<HttpURLConnection> {
-    private final URL url;
-    @Override
-    protected URL getUrl() {
-      return url;
-    }
-
-    protected URLRunner(final HttpOpParam.Op op, final URL url, boolean redirected) {
-      super(op, redirected);
-      this.url = url;
-    }
-
-    @Override
-    HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
-      return conn;
-    }
-  }
-
-  private FsPermission applyUMask(FsPermission permission) {
-    if (permission == null) {
-      permission = FsPermission.getDefault();
-    }
-    return permission.applyUMask(FsPermission.getUMask(getConf()));
-  }
-
-  private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
-    final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
-    HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
-      @Override
-      HdfsFileStatus decodeResponse(Map<?,?> json) {
-        return JsonUtilClient.toFileStatus(json, true);
-      }
-    }.run();
-    if (status == null) {
-      throw new FileNotFoundException("File does not exist: " + f);
-    }
-    return status;
-  }
-
-  @Override
-  public FileStatus getFileStatus(Path f) throws IOException {
-    statistics.incrementReadOps(1);
-    return makeQualified(getHdfsFileStatus(f), f);
-  }
-
-  private FileStatus makeQualified(HdfsFileStatus f, Path parent) {
-    return new FileStatus(f.getLen(), f.isDir(), f.getReplication(),
-        f.getBlockSize(), f.getModificationTime(), f.getAccessTime(),
-        f.getPermission(), f.getOwner(), f.getGroup(),
-        f.isSymlink() ? new Path(f.getSymlink()) : null,
-        f.getFullPath(parent).makeQualified(getUri(), getWorkingDirectory()));
-  }
-
-  @Override
-  public AclStatus getAclStatus(Path f) throws IOException {
-    final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
-    AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
-      @Override
-      AclStatus decodeResponse(Map<?,?> json) {
-        return JsonUtilClient.toAclStatus(json);
-      }
-    }.run();
-    if (status == null) {
-      throw new FileNotFoundException("File does not exist: " + f);
-    }
-    return status;
-  }
-
-  @Override
-  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
-    return new FsPathBooleanRunner(op, f,
-        new PermissionParam(applyUMask(permission))
-    ).run();
-  }
-
-  /**
-   * Create a symlink pointing to the destination path.
-   */
-  public void createSymlink(Path destination, Path f, boolean createParent
-      ) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
-    new FsPathRunner(op, f,
-        new DestinationParam(makeQualified(destination).toUri().getPath()),
-        new CreateParentParam(createParent)
-    ).run();
-  }
-
-  @Override
-  public boolean rename(final Path src, final Path dst) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
-    return new FsPathBooleanRunner(op, src,
-        new DestinationParam(makeQualified(dst).toUri().getPath())
-    ).run();
-  }
-
-  @SuppressWarnings("deprecation")
-  @Override
-  public void rename(final Path src, final Path dst,
-      final Options.Rename... options) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
-    new FsPathRunner(op, src,
-        new DestinationParam(makeQualified(dst).toUri().getPath()),
-        new RenameOptionSetParam(options)
-    ).run();
-  }
-  
-  @Override
-  public void setXAttr(Path p, String name, byte[] value, 
-      EnumSet<XAttrSetFlag> flag) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
-    if (value != null) {
-      new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
-          XAttrCodec.encodeValue(value, XAttrCodec.HEX)), 
-          new XAttrSetFlagParam(flag)).run();
-    } else {
-      new FsPathRunner(op, p, new XAttrNameParam(name), 
-          new XAttrSetFlagParam(flag)).run();
-    }
-  }
-  
-  @Override
-  public byte[] getXAttr(Path p, final String name) throws IOException {
-    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
-    return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name), 
-        new XAttrEncodingParam(XAttrCodec.HEX)) {
-      @Override
-      byte[] decodeResponse(Map<?, ?> json) throws IOException {
-        return JsonUtilClient.getXAttr(json, name);
-      }
-    }.run();
-  }
-  
-  @Override
-  public Map<String, byte[]> getXAttrs(Path p) throws IOException {
-    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
-    return new FsPathResponseRunner<Map<String, byte[]>>(op, p, 
-        new XAttrEncodingParam(XAttrCodec.HEX)) {
-      @Override
-      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
-        return JsonUtilClient.toXAttrs(json);
-      }
-    }.run();
-  }
-  
-  @Override
-  public Map<String, byte[]> getXAttrs(Path p, final List<String> names) 
-      throws IOException {
-    Preconditions.checkArgument(names != null && !names.isEmpty(), 
-        "XAttr names cannot be null or empty.");
-    Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
-    for (int i = 0; i < parameters.length - 1; i++) {
-      parameters[i] = new XAttrNameParam(names.get(i));
-    }
-    parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);
-    
-    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
-    return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
-      @Override
-      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
-        return JsonUtilClient.toXAttrs(json);
-      }
-    }.run();
-  }
-  
-  @Override
-  public List<String> listXAttrs(Path p) throws IOException {
-    final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
-    return new FsPathResponseRunner<List<String>>(op, p) {
-      @Override
-      List<String> decodeResponse(Map<?, ?> json) throws IOException {
-        return JsonUtilClient.toXAttrNames(json);
-      }
-    }.run();
-  }
-
-  @Override
-  public void removeXAttr(Path p, String name) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
-    new FsPathRunner(op, p, new XAttrNameParam(name)).run();
-  }
-
-  @Override
-  public void setOwner(final Path p, final String owner, final String group
-      ) throws IOException {
-    if (owner == null && group == null) {
-      throw new IOException("owner == null && group == null");
-    }
-
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
-    new FsPathRunner(op, p,
-        new OwnerParam(owner), new GroupParam(group)
-    ).run();
-  }
-
-  @Override
-  public void setPermission(final Path p, final FsPermission permission
-      ) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
-    new FsPathRunner(op, p,new PermissionParam(permission)).run();
-  }
-
-  @Override
-  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
-      throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
-    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
-  }
-
-  @Override
-  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
-      throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
-    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();
-  }
-
-  @Override
-  public void removeDefaultAcl(Path path) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
-    new FsPathRunner(op, path).run();
-  }
-
-  @Override
-  public void removeAcl(Path path) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
-    new FsPathRunner(op, path).run();
-  }
-
-  @Override
-  public void setAcl(final Path p, final List<AclEntry> aclSpec)
-      throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.SETACL;
-    new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();
-  }
-
-  @Override
-  public Path createSnapshot(final Path path, final String snapshotName) 
-      throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
-    Path spath = new FsPathResponseRunner<Path>(op, path,
-        new SnapshotNameParam(snapshotName)) {
-      @Override
-      Path decodeResponse(Map<?,?> json) {
-        return new Path((String) json.get(Path.class.getSimpleName()));
-      }
-    }.run();
-    return spath;
-  }
-
-  @Override
-  public void deleteSnapshot(final Path path, final String snapshotName)
-      throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
-    new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();
-  }
-
-  @Override
-  public void renameSnapshot(final Path path, final String snapshotOldName,
-      final String snapshotNewName) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
-    new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
-        new SnapshotNameParam(snapshotNewName)).run();
-  }
-
-  @Override
-  public boolean setReplication(final Path p, final short replication
-     ) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
-    return new FsPathBooleanRunner(op, p,
-        new ReplicationParam(replication)
-    ).run();
-  }
-
-  @Override
-  public void setTimes(final Path p, final long mtime, final long atime
-      ) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
-    new FsPathRunner(op, p,
-        new ModificationTimeParam(mtime),
-        new AccessTimeParam(atime)
-    ).run();
-  }
-
-  @Override
-  public long getDefaultBlockSize() {
-    return getConf().getLongBytes(DFSConfigKeys.DFS_BLOCK_SIZE_KEY,
-        DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT);
-  }
-
-  @Override
-  public short getDefaultReplication() {
-    return (short)getConf().getInt(DFSConfigKeys.DFS_REPLICATION_KEY,
-        DFSConfigKeys.DFS_REPLICATION_DEFAULT);
-  }
-
-  @Override
-  public void concat(final Path trg, final Path [] srcs) throws IOException {
-    statistics.incrementWriteOps(1);
-    final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
-    new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();
-  }
-
-  @Override
-  public FSDataOutputStream create(final Path f, final FsPermission permission,
-      final boolean overwrite, final int bufferSize, final short replication,
-      final long blockSize, final Progressable progress) throws IOException {
-    statistics.incrementWriteOps(1);
-
-    final HttpOpParam.Op op = PutOpParam.Op.CREATE;
-    return new FsPathOutputStreamRunner(op, f, bufferSize,
-        new PermissionParam(applyUMask(permission)),
-        new OverwriteParam(overwrite),
-        new BufferSizeParam(bufferSize),
-        new ReplicationParam(replication),
-        new BlockSizeParam(blockSize)
-    ).run();
-  }
-
-  @Override
-  public FSDataOutputStream append(final Path f, final int bufferSize,
-      final Progressable progress) throws IOException {
-    statistics.incrementWriteOps(1);
-
-    final HttpOpParam.Op op = PostOpParam.Op.APPEND;
-    return new FsPathOutputStreamRunner(op, f, bufferSize,
-        new BufferSizeParam(bufferSize)
-    ).run();
-  }
-
-  @Override
-  public boolean truncate(Path f, long newLength) throws IOException {
-    statistics.incrementWriteOps(1);
-
-    final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
-    return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();
-  }
-
-  @Override
-  public boolean delete(Path f, boolean recursive) throws IOException {
-    final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
-    return new FsPathBooleanRunner(op, f,
-        new RecursiveParam(recursive)
-    ).run();
-  }
-
-  @Override
-  public FSDataInputStream open(final Path f, final int buffersize
-      ) throws IOException {
-    statistics.incrementReadOps(1);
-    final HttpOpParam.Op op = GetOpParam.Op.OPEN;
-    // use a runner so the open can recover from an invalid token
-    FsPathConnectionRunner runner =
-        new FsPathConnectionRunner(op, f, new BufferSizeParam(buffersize));
-    return new FSDataInputStream(new OffsetUrlInputStream(
-        new UnresolvedUrlOpener(runner), new OffsetUrlOpener(null)));
-  }
-
-  @Override
-  public synchronized void close() throws IOException {
-    try {
-      if (canRefreshDelegationToken && delegationToken != null) {
-        cancelDelegationToken(delegationToken);
-      }
-    } catch (IOException ioe) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Token cancel failed: " + ioe);
-      }
-    } finally {
-      super.close();
-    }
-  }
-
-  // use FsPathConnectionRunner to ensure retries for InvalidTokens
-  class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
-    private final FsPathConnectionRunner runner;
-    UnresolvedUrlOpener(FsPathConnectionRunner runner) {
-      super(null);
-      this.runner = runner;
-    }
-
-    @Override
-    protected HttpURLConnection connect(long offset, boolean resolved)
-        throws IOException {
-      assert offset == 0;
-      HttpURLConnection conn = runner.run();
-      setURL(conn.getURL());
-      return conn;
-    }
-  }
-
-  class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
-    OffsetUrlOpener(final URL url) {
-      super(url);
-    }
-
-    /** Setup offset url and connect. */
-    @Override
-    protected HttpURLConnection connect(final long offset,
-        final boolean resolved) throws IOException {
-      final URL offsetUrl = offset == 0L? url
-          : new URL(url + "&" + new OffsetParam(offset));
-      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run();
-    }  
-  }
-
-  private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";
-
-  /** Remove offset parameter, if there is any, from the url */
-  static URL removeOffsetParam(final URL url) throws MalformedURLException {
-    String query = url.getQuery();
-    if (query == null) {
-      return url;
-    }
-    final String lower = StringUtils.toLowerCase(query);
-    if (!lower.startsWith(OFFSET_PARAM_PREFIX)
-        && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
-      return url;
-    }
-
-    //rebuild query
-    StringBuilder b = null;
-    for(final StringTokenizer st = new StringTokenizer(query, "&");
-        st.hasMoreTokens();) {
-      final String token = st.nextToken();
-      if (!StringUtils.toLowerCase(token).startsWith(OFFSET_PARAM_PREFIX)) {
-        if (b == null) {
-          b = new StringBuilder("?").append(token);
-        } else {
-          b.append('&').append(token);
-        }
-      }
-    }
-    query = b == null? "": b.toString();
-
-    final String urlStr = url.toString();
-    return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);
-  }
-
-  static class OffsetUrlInputStream extends ByteRangeInputStream {
-    OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
-        throws IOException {
-      super(o, r);
-    }
-
-    /** Remove offset parameter before returning the resolved url. */
-    @Override
-    protected URL getResolvedUrl(final HttpURLConnection connection
-        ) throws MalformedURLException {
-      return removeOffsetParam(connection.getURL());
-    }
-  }
-
-  @Override
-  public FileStatus[] listStatus(final Path f) throws IOException {
-    statistics.incrementReadOps(1);
-
-    final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
-    return new FsPathResponseRunner<FileStatus[]>(op, f) {
-      @Override
-      FileStatus[] decodeResponse(Map<?,?> json) {
-        final Map<?, ?> rootmap = (Map<?, ?>)json.get(FileStatus.class.getSimpleName() + "es");
-        final List<?> array = JsonUtilClient.getList(rootmap,
-                                                     FileStatus.class.getSimpleName());
-
-        //convert FileStatus
-        final FileStatus[] statuses = new FileStatus[array.size()];
-        int i = 0;
-        for (Object object : array) {
-          final Map<?, ?> m = (Map<?, ?>) object;
-          statuses[i++] = makeQualified(JsonUtilClient.toFileStatus(m, false), f);
-        }
-        return statuses;
-      }
-    }.run();
-  }
-
-  @Override
-  public Token<DelegationTokenIdentifier> getDelegationToken(
-      final String renewer) throws IOException {
-    final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
-    Token<DelegationTokenIdentifier> token =
-        new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
-            op, null, new RenewerParam(renewer)) {
-      @Override
-      Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
-          throws IOException {
-        return JsonUtilClient.toDelegationToken(json);
-      }
-    }.run();
-    if (token != null) {
-      token.setService(tokenServiceName);
-    } else {
-      if (disallowFallbackToInsecureCluster) {
-        throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
-      }
-    }
-    return token;
-  }
-
-  @Override
-  public synchronized Token<?> getRenewToken() {
-    return delegationToken;
-  }
-
-  @Override
-  public <T extends TokenIdentifier> void setDelegationToken(
-      final Token<T> token) {
-    synchronized (this) {
-      delegationToken = token;
-    }
-  }
-
-  @Override
-  public synchronized long renewDelegationToken(final Token<?> token
-      ) throws IOException {
-    final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
-    return new FsPathResponseRunner<Long>(op, null,
-        new TokenArgumentParam(token.encodeToUrlString())) {
-      @Override
-      Long decodeResponse(Map<?,?> json) throws IOException {
-        return ((Number) json.get("long")).longValue();
-      }
-    }.run();
-  }
-
-  @Override
-  public synchronized void cancelDelegationToken(final Token<?> token
-      ) throws IOException {
-    final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
-    new FsPathRunner(op, null,
-        new TokenArgumentParam(token.encodeToUrlString())
-    ).run();
-  }
-  
-  @Override
-  public BlockLocation[] getFileBlockLocations(final FileStatus status,
-      final long offset, final long length) throws IOException {
-    if (status == null) {
-      return null;
-    }
-    return getFileBlockLocations(status.getPath(), offset, length);
-  }
-
-  @Override
-  public BlockLocation[] getFileBlockLocations(final Path p, 
-      final long offset, final long length) throws IOException {
-    statistics.incrementReadOps(1);
-
-    final HttpOpParam.Op op = GetOpParam.Op.GET_BLOCK_LOCATIONS;
-    return new FsPathResponseRunner<BlockLocation[]>(op, p,
-        new OffsetParam(offset), new LengthParam(length)) {
-      @Override
-      BlockLocation[] decodeResponse(Map<?,?> json) throws IOException {
-        return DFSUtil.locatedBlocks2Locations(
-            JsonUtilClient.toLocatedBlocks(json));
-      }
-    }.run();
-  }
-
-  @Override
-  public void access(final Path path, final FsAction mode) throws IOException {
-    final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
-    new FsPathRunner(op, path, new FsActionParam(mode)).run();
-  }
-
-  @Override
-  public ContentSummary getContentSummary(final Path p) throws IOException {
-    statistics.incrementReadOps(1);
-
-    final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
-    return new FsPathResponseRunner<ContentSummary>(op, p) {
-      @Override
-      ContentSummary decodeResponse(Map<?,?> json) {
-        return JsonUtilClient.toContentSummary(json);
-      }
-    }.run();
-  }
-
-  @Override
-  public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
-      ) throws IOException {
-    statistics.incrementReadOps(1);
-  
-    final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
-    return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
-      @Override
-      MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
-        return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);
-      }
-    }.run();
-  }
-
-  /**
-   * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
-   * resolver when the URL points to an non-HA cluster. When the URL points to
-   * an HA cluster with its logical name, the resolver further resolves the
-   * logical name(i.e., the authority in the URL) into real namenode addresses.
-   */
-  private InetSocketAddress[] resolveNNAddr() throws IOException {
-    Configuration conf = getConf();
-    final String scheme = uri.getScheme();
-
-    ArrayList<InetSocketAddress> ret = new ArrayList<InetSocketAddress>();
-
-    if (!HAUtilClient.isLogicalUri(conf, uri)) {
-      InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),
-          getDefaultPort());
-      ret.add(addr);
-
-    } else {
-      Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient
-          .getHaNnWebHdfsAddresses(conf, scheme);
-
-      // Extract the entry corresponding to the logical name.
-      Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
-      for (InetSocketAddress addr : addrs.values()) {
-        ret.add(addr);
-      }
-    }
-
-    InetSocketAddress[] r = new InetSocketAddress[ret.size()];
-    return ret.toArray(r);
-  }
-
-  @Override
-  public String getCanonicalServiceName() {
-    return tokenServiceName == null ? super.getCanonicalServiceName()
-        : tokenServiceName.toString();
-  }
-
-  @VisibleForTesting
-  InetSocketAddress[] getResolvedNNAddr() {
-    return nnAddrs;
-  }
-}