You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by vi...@apache.org on 2013/10/30 23:22:22 UTC

svn commit: r1537330 [7/11] - in /hadoop/common/branches/YARN-321/hadoop-hdfs-project: ./ hadoop-hdfs-httpfs/src/main/java/org/apache/hadoop/lib/server/ hadoop-hdfs-nfs/ hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/mount/ hadoop-hdfs-nfs/sr...

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeHttpServer.java Wed Oct 30 22:21:59 2013
@@ -52,6 +52,7 @@ public class NameNodeHttpServer {
   private final NameNode nn;
   
   private InetSocketAddress httpAddress;
+  private InetSocketAddress httpsAddress;
   private InetSocketAddress bindAddress;
   
   public static final String NAMENODE_ADDRESS_ATTRIBUTE_KEY = "name.node.address";
@@ -99,14 +100,15 @@ public class NameNodeHttpServer {
     boolean certSSL = conf.getBoolean(DFSConfigKeys.DFS_HTTPS_ENABLE_KEY, false);
     if (certSSL) {
       boolean needClientAuth = conf.getBoolean("dfs.https.need.client.auth", false);
-      InetSocketAddress secInfoSocAddr = NetUtils.createSocketAddr(infoHost + ":" + conf.get(
-        DFSConfigKeys.DFS_NAMENODE_HTTPS_PORT_KEY, infoHost + ":" + 0));
+      httpsAddress = NetUtils.createSocketAddr(conf.get(
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY,
+          DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT));
+
       Configuration sslConf = new Configuration(false);
-      if (certSSL) {
-        sslConf.addResource(conf.get(DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
-                                     "ssl-server.xml"));
-      }
-      httpServer.addSslListener(secInfoSocAddr, sslConf, needClientAuth);
+      sslConf.addResource(conf.get(
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_KEY,
+          DFSConfigKeys.DFS_SERVER_HTTPS_KEYSTORE_RESOURCE_DEFAULT));
+      httpServer.addSslListener(httpsAddress, sslConf, needClientAuth);
       // assume same ssl port for all datanodes
       InetSocketAddress datanodeSslPort = NetUtils.createSocketAddr(conf.get(
         DFSConfigKeys.DFS_DATANODE_HTTPS_ADDRESS_KEY, infoHost + ":" + 50475));
@@ -163,6 +165,10 @@ public class NameNodeHttpServer {
     return httpAddress;
   }
 
+  public InetSocketAddress getHttpsAddress() {
+    return httpsAddress;
+  }
+
   /**
    * Sets fsimage for use by servlets.
    * 

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java Wed Oct 30 22:21:59 2013
@@ -29,8 +29,9 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.util.Arrays;
 import java.util.Collection;
-import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -543,11 +544,11 @@ class NameNodeRpcServer implements Namen
       stateChangeLog.debug("*BLOCK* NameNode.addBlock: file " + src
           + " fileId=" + fileId + " for " + clientName);
     }
-    HashMap<Node, Node> excludedNodesSet = null;
+    Set<Node> excludedNodesSet = null;
     if (excludedNodes != null) {
-      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+      excludedNodesSet = new HashSet<Node>(excludedNodes.length);
       for (Node node : excludedNodes) {
-        excludedNodesSet.put(node, node);
+        excludedNodesSet.add(node);
       }
     }
     List<String> favoredNodesList = (favoredNodes == null) ? null
@@ -575,11 +576,11 @@ class NameNodeRpcServer implements Namen
 
     metrics.incrGetAdditionalDatanodeOps();
 
-    HashMap<Node, Node> excludeSet = null;
+    Set<Node> excludeSet = null;
     if (excludes != null) {
-      excludeSet = new HashMap<Node, Node>(excludes.length);
+      excludeSet = new HashSet<Node>(excludes.length);
       for (Node node : excludes) {
-        excludeSet.put(node, node);
+        excludeSet.add(node);
       }
     }
     return namesystem.getAdditionalDatanode(src, blk,

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeFsck.java Wed Oct 30 22:21:59 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.protocol.H
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementStatus;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
 import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
 import org.apache.hadoop.net.NetUtils;
@@ -374,9 +375,10 @@ public class NamenodeFsck {
                     locs.length + " replica(s).");
       }
       // verify block placement policy
-      int missingRacks = BlockPlacementPolicy.getInstance(conf, null, networktopology).
-                           verifyBlockPlacement(path, lBlk, Math.min(2,targetFileReplication));
-      if (missingRacks > 0) {
+      BlockPlacementStatus blockPlacementStatus = 
+          BlockPlacementPolicy.getInstance(conf, null, networktopology).
+              verifyBlockPlacement(path, lBlk, targetFileReplication);
+      if (!blockPlacementStatus.isPlacementPolicySatisfied()) {
         res.numMisReplicatedBlocks++;
         misReplicatedPerFile++;
         if (!showFiles) {
@@ -385,9 +387,7 @@ public class NamenodeFsck {
           out.print(path + ": ");
         }
         out.println(" Replica placement policy is violated for " + 
-                    block +
-                    ". Block should be additionally replicated on " + 
-                    missingRacks + " more rack(s).");
+                    block + ". " + blockPlacementStatus.getErrorDescription());
       }
       report.append(i + ". " + blkName + " len=" + block.getNumBytes());
       if (locs.length == 0) {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NamenodeJspHelper.java Wed Oct 30 22:21:59 2013
@@ -30,6 +30,7 @@ import java.net.URLEncoder;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -101,6 +102,10 @@ class NamenodeJspHelper {
   }
 
   static String getRollingUpgradeText(FSNamesystem fsn) {
+    if (fsn == null) {
+      return "";
+    }
+
     DatanodeManager dm = fsn.getBlockManager().getDatanodeManager();
     Map<String, Integer> list = dm.getDatanodesSoftwareVersions();
     if(list.size() > 1) {
@@ -203,6 +208,20 @@ class NamenodeJspHelper {
     return "";
   }
 
+  static void generateSnapshotReport(JspWriter out, FSNamesystem fsn)
+      throws IOException {
+    if (fsn == null) {
+      return;
+    }
+    out.println("<div id=\"snapshotstats\"><div class=\"dfstable\">"
+        + "<table class=\"storage\" title=\"Snapshot Summary\">\n"
+        + "<thead><tr><td><b>Snapshottable directories</b></td>"
+        + "<td><b>Snapshotted directories</b></td></tr></thead>");
+
+    out.println(String.format("<td>%d</td><td>%d</td>", fsn.getNumSnapshottableDirs(), fsn.getNumSnapshots()));
+    out.println("</table></div></div>");
+  }
+
   static class HealthJsp {
     private int rowNum = 0;
     private int colNum = 0;
@@ -636,25 +655,22 @@ class NamenodeJspHelper {
         .getAttribute(JspHelper.CURRENT_CONF);
     // We can't redirect if there isn't a DN to redirect to.
     // Lets instead show a proper error message.
-    if (nn.getNamesystem().getNumLiveDataNodes() < 1) {
+    FSNamesystem fsn = nn.getNamesystem();
+
+    DatanodeID datanode = null;
+    if (fsn != null && fsn.getNumLiveDataNodes() >= 1) {
+      datanode = getRandomDatanode(nn);
+    }
+
+    if (datanode == null) {
       throw new IOException("Can't browse the DFS since there are no " +
           "live nodes available to redirect to.");
     }
-    final DatanodeID datanode = getRandomDatanode(nn);;
+
     UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
+    // if the user is defined, get a delegation token and stringify it
     String tokenString = getDelegationToken(
         nn.getRpcServer(), request, conf, ugi);
-    // if the user is defined, get a delegation token and stringify it
-    final String redirectLocation;
-    final String nodeToRedirect;
-    int redirectPort;
-    if (datanode != null) {
-      nodeToRedirect = datanode.getIpAddr();
-      redirectPort = datanode.getInfoPort();
-    } else {
-      nodeToRedirect = nn.getHttpAddress().getHostName();
-      redirectPort = nn.getHttpAddress().getPort();
-    }
 
     InetSocketAddress rpcAddr = nn.getNameNodeAddress();
     String rpcHost = rpcAddr.getAddress().isAnyLocalAddress()
@@ -662,16 +678,31 @@ class NamenodeJspHelper {
       : rpcAddr.getAddress().getHostAddress();
     String addr = rpcHost + ":" + rpcAddr.getPort();
 
-    String fqdn = InetAddress.getByName(nodeToRedirect).getCanonicalHostName();
-    redirectLocation = HttpConfig.getSchemePrefix() + fqdn + ":" + redirectPort
+    final String redirectLocation =
+        JspHelper.Url.url(request.getScheme(), datanode)
         + "/browseDirectory.jsp?namenodeInfoPort="
-        + nn.getHttpAddress().getPort() + "&dir=/"
+        + request.getServerPort() + "&dir=/"
         + (tokenString == null ? "" :
            JspHelper.getDelegationTokenUrlParam(tokenString))
         + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, addr);
+
     resp.sendRedirect(redirectLocation);
   }
 
+  /**
+   * Returns a descriptive label for the running NameNode.  If the NameNode has
+   * initialized to the point of running its RPC server, then this label consists
+   * of the host and port of the RPC server.  Otherwise, the label is a message
+   * stating that the NameNode is still initializing.
+   * 
+   * @param nn NameNode to describe
+   * @return String NameNode label
+   */
+  static String getNameNodeLabel(NameNode nn) {
+    return nn.getRpcServer() != null ? nn.getNameNodeAddressHostPortString() :
+      "initializing";
+  }
+
   static class NodeListJsp {
     private int rowNum = 0;
 
@@ -709,12 +740,11 @@ class NamenodeJspHelper {
     }
 
     private void generateNodeDataHeader(JspWriter out, DatanodeDescriptor d,
-        String suffix, boolean alive, int nnHttpPort, String nnaddr)
+        String suffix, boolean alive, int nnInfoPort, String nnaddr, String scheme)
         throws IOException {
       // from nn_browsedfscontent.jsp:
-      String url = HttpConfig.getSchemePrefix() + d.getHostName() + ":"
-          + d.getInfoPort()
-          + "/browseDirectory.jsp?namenodeInfoPort=" + nnHttpPort + "&dir="
+      String url = "///" + JspHelper.Url.authority(scheme, d)
+          + "/browseDirectory.jsp?namenodeInfoPort=" + nnInfoPort + "&dir="
           + URLEncoder.encode("/", "UTF-8")
           + JspHelper.getUrlParam(JspHelper.NAMENODE_ADDRESS, nnaddr);
 
@@ -731,9 +761,9 @@ class NamenodeJspHelper {
     }
 
     void generateDecommissioningNodeData(JspWriter out, DatanodeDescriptor d,
-        String suffix, boolean alive, int nnHttpPort, String nnaddr)
+        String suffix, boolean alive, int nnInfoPort, String nnaddr, String scheme)
         throws IOException {
-      generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
+      generateNodeDataHeader(out, d, suffix, alive, nnInfoPort, nnaddr, scheme);
       if (!alive) {
         return;
       }
@@ -757,7 +787,7 @@ class NamenodeJspHelper {
     }
     
     void generateNodeData(JspWriter out, DatanodeDescriptor d, String suffix,
-        boolean alive, int nnHttpPort, String nnaddr) throws IOException {
+        boolean alive, int nnInfoPort, String nnaddr, String scheme) throws IOException {
       /*
        * Say the datanode is dn1.hadoop.apache.org with ip 192.168.0.5 we use:
        * 1) d.getHostName():d.getPort() to display. Domain and port are stripped
@@ -769,7 +799,7 @@ class NamenodeJspHelper {
        * interact with datanodes.
        */
 
-      generateNodeDataHeader(out, d, suffix, alive, nnHttpPort, nnaddr);
+      generateNodeDataHeader(out, d, suffix, alive, nnInfoPort, nnaddr, scheme);
       if (!alive) {
         out.print("<td class=\"decommissioned\"> " + 
             d.isDecommissioned() + "\n");
@@ -826,17 +856,17 @@ class NamenodeJspHelper {
         HttpServletRequest request) throws IOException {
       final NameNode nn = NameNodeHttpServer.getNameNodeFromContext(context);
       final FSNamesystem ns = nn.getNamesystem();
+      if (ns == null) {
+        return;
+      }
       final DatanodeManager dm = ns.getBlockManager().getDatanodeManager();
 
       final List<DatanodeDescriptor> live = new ArrayList<DatanodeDescriptor>();
       final List<DatanodeDescriptor> dead = new ArrayList<DatanodeDescriptor>();
       dm.fetchDatanodes(live, dead, true);
 
-      InetSocketAddress nnSocketAddress =
-          (InetSocketAddress)context.getAttribute(
-              NameNodeHttpServer.NAMENODE_ADDRESS_ATTRIBUTE_KEY);
-      String nnaddr = nnSocketAddress.getAddress().getHostAddress() + ":"
-          + nnSocketAddress.getPort();
+      String nnaddr = nn.getServiceRpcAddress().getAddress().getHostName() + ":"
+          + nn.getServiceRpcAddress().getPort();
 
       whatNodes = request.getParameter("whatNodes"); // show only live or only
                                                      // dead nodes
@@ -872,16 +902,11 @@ class NamenodeJspHelper {
 
       counterReset();
 
-      try {
-        Thread.sleep(1000);
-      } catch (InterruptedException e) {
-      }
-
       if (live.isEmpty() && dead.isEmpty()) {
         out.print("There are no datanodes in the cluster");
       } else {
 
-        int nnHttpPort = nn.getHttpAddress().getPort();
+        int nnInfoPort = request.getServerPort();
         out.print("<div id=\"dfsnodetable\"> ");
         if (whatNodes.equals("LIVE")) {
           out.print("<a name=\"LiveNodes\" id=\"title\">" + "Live Datanodes : "
@@ -923,8 +948,8 @@ class NamenodeJspHelper {
 
             JspHelper.sortNodeList(live, sorterField, sorterOrder);
             for (int i = 0; i < live.size(); i++) {
-              generateNodeData(out, live.get(i), port_suffix, true, nnHttpPort,
-                  nnaddr);
+              generateNodeData(out, live.get(i), port_suffix, true, nnInfoPort,
+                  nnaddr, request.getScheme());
             }
           }
           out.print("</table>\n");
@@ -944,7 +969,7 @@ class NamenodeJspHelper {
             JspHelper.sortNodeList(dead, sorterField, sorterOrder);
             for (int i = 0; i < dead.size(); i++) {
               generateNodeData(out, dead.get(i), port_suffix, false,
-                  nnHttpPort, nnaddr);
+                  nnInfoPort, nnaddr, request.getScheme());
             }
 
             out.print("</table>\n");
@@ -975,7 +1000,7 @@ class NamenodeJspHelper {
             JspHelper.sortNodeList(decommissioning, "name", "ASC");
             for (int i = 0; i < decommissioning.size(); i++) {
               generateDecommissioningNodeData(out, decommissioning.get(i),
-                  port_suffix, true, nnHttpPort, nnaddr);
+                  port_suffix, true, nnInfoPort, nnaddr, request.getScheme());
             }
             out.print("</table>\n");
           }
@@ -1003,14 +1028,16 @@ class NamenodeJspHelper {
     final BlockManager blockManager;
     
     XMLBlockInfo(FSNamesystem fsn, Long blockId) {
-      this.blockManager = fsn.getBlockManager();
+      this.blockManager = fsn != null ? fsn.getBlockManager() : null;
 
       if (blockId == null) {
         this.block = null;
         this.inode = null;
       } else {
         this.block = new Block(blockId);
-        this.inode = ((INode)blockManager.getBlockCollection(block)).asFile();
+        this.inode = blockManager != null ?
+          ((INode)blockManager.getBlockCollection(block)).asFile() :
+          null;
       }
     }
 
@@ -1084,8 +1111,10 @@ class NamenodeJspHelper {
         } 
 
         doc.startTag("replicas");
-        for(final Iterator<DatanodeDescriptor> it = blockManager.datanodeIterator(block);
-            it.hasNext(); ) {
+        for (final Iterator<DatanodeDescriptor> it = blockManager != null ?
+            blockManager.datanodeIterator(block) :
+            Collections.<DatanodeDescriptor>emptyList().iterator();
+            it.hasNext();) {
           doc.startTag("replica");
 
           DatanodeDescriptor dd = it.next();
@@ -1121,7 +1150,7 @@ class NamenodeJspHelper {
     
     XMLCorruptBlockInfo(FSNamesystem fsn, Configuration conf,
                                int numCorruptBlocks, Long startingBlockId) {
-      this.blockManager = fsn.getBlockManager();
+      this.blockManager = fsn != null ? fsn.getBlockManager() : null;
       this.conf = conf;
       this.numCorruptBlocks = numCorruptBlocks;
       this.startingBlockId = startingBlockId;
@@ -1144,16 +1173,19 @@ class NamenodeJspHelper {
       doc.endTag();
       
       doc.startTag("num_missing_blocks");
-      doc.pcdata(""+blockManager.getMissingBlocksCount());
+      doc.pcdata("" + (blockManager != null ?
+        blockManager.getMissingBlocksCount() : 0));
       doc.endTag();
       
       doc.startTag("num_corrupt_replica_blocks");
-      doc.pcdata(""+blockManager.getCorruptReplicaBlocksCount());
+      doc.pcdata("" + (blockManager != null ?
+        blockManager.getCorruptReplicaBlocksCount() : 0));
       doc.endTag();
      
       doc.startTag("corrupt_replica_block_ids");
-      final long[] corruptBlockIds = blockManager.getCorruptReplicaBlockIds(
-          numCorruptBlocks, startingBlockId);
+      final long[] corruptBlockIds = blockManager != null ?
+        blockManager.getCorruptReplicaBlockIds(numCorruptBlocks,
+        startingBlockId) : null;
       if (corruptBlockIds != null) {
         for (Long blockId: corruptBlockIds) {
           doc.startTag("block_id");

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Namesystem.java Wed Oct 30 22:21:59 2013
@@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.na
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction;
 import org.apache.hadoop.hdfs.server.namenode.NameNode.OperationCategory;
 import org.apache.hadoop.hdfs.util.RwLock;
 import org.apache.hadoop.ipc.StandbyException;
@@ -43,4 +44,6 @@ public interface Namesystem extends RwLo
   public void adjustSafeModeBlockTotals(int deltaSafe, int deltaTotal);
 
   public void checkOperation(OperationCategory read) throws StandbyException;
+
+  public boolean isInSnapshot(BlockInfoUnderConstruction blockUC);
 }
\ No newline at end of file

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SafeModeException.java Wed Oct 30 22:21:59 2013
@@ -33,10 +33,7 @@ import org.apache.hadoop.classification.
 public class SafeModeException extends IOException {
   private static final long serialVersionUID = 1L;
 
-  public SafeModeException() {}
-
   public SafeModeException(String text, FSNamesystem.SafeModeInfo mode ) {
     super(text + ". Name node is in safe mode.\n" + mode.getTurnOffTip());
   }
-
-}
+}
\ No newline at end of file

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java Wed Oct 30 22:21:59 2013
@@ -155,7 +155,7 @@ public class SecondaryNameNode implement
   }
 
   @VisibleForTesting
-  FSNamesystem getFSNamesystem() {
+  public FSNamesystem getFSNamesystem() {
     return namesystem;
   }
   
@@ -429,10 +429,8 @@ public class SecondaryNameNode implement
             dstImage.getStorage().cTime = sig.cTime;
 
             // get fsimage
-            boolean downloadImage = true;
             if (sig.mostRecentCheckpointTxId ==
                 dstImage.getStorage().getMostRecentCheckpointTxId()) {
-              downloadImage = false;
               LOG.info("Image has not changed. Will not download image.");
             } else {
               LOG.info("Image has changed. Downloading updated image from NN.");
@@ -448,7 +446,9 @@ public class SecondaryNameNode implement
                   nnHostPort, log, dstImage.getStorage());
             }
         
-            return Boolean.valueOf(downloadImage);
+            // true if we haven't loaded all the transactions represented by the
+            // downloaded fsimage.
+            return dstImage.getLastAppliedTxId() < sig.mostRecentCheckpointTxId;
           }
         });
         return b.booleanValue();
@@ -489,7 +489,8 @@ public class SecondaryNameNode implement
    * Create a new checkpoint
    * @return if the image is fetched from primary or not
    */
-  boolean doCheckpoint() throws IOException {
+  @VisibleForTesting
+  public boolean doCheckpoint() throws IOException {
     checkpointImage.ensureCurrentDirExists();
     NNStorage dstStorage = checkpointImage.getStorage();
     

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StartupProgressServlet.java Wed Oct 30 22:21:59 2013
@@ -44,6 +44,7 @@ public class StartupProgressServlet exte
   private static final String ELAPSED_TIME = "elapsedTime";
   private static final String FILE = "file";
   private static final String NAME = "name";
+  private static final String DESC = "desc";
   private static final String PERCENT_COMPLETE = "percentComplete";
   private static final String PHASES = "phases";
   private static final String SIZE = "size";
@@ -70,6 +71,7 @@ public class StartupProgressServlet exte
       for (Phase phase: view.getPhases()) {
         json.writeStartObject();
         json.writeStringField(NAME, phase.getName());
+        json.writeStringField(DESC, phase.getDescription());
         json.writeStringField(STATUS, view.getStatus(phase).toString());
         json.writeNumberField(PERCENT_COMPLETE, view.getPercentComplete(phase));
         json.writeNumberField(ELAPSED_TIME, view.getElapsedTime(phase));
@@ -80,8 +82,10 @@ public class StartupProgressServlet exte
         for (Step step: view.getSteps(phase)) {
           json.writeStartObject();
           StepType type = step.getType();
-          String name = type != null ? type.getName() : null;
-          writeStringFieldIfNotNull(json, NAME, name);
+          if (type != null) {
+            json.writeStringField(NAME, type.getName());
+            json.writeStringField(DESC, type.getDescription());
+          }
           json.writeNumberField(COUNT, view.getCount(phase, step));
           writeStringFieldIfNotNull(json, FILE, step.getFile());
           writeNumberFieldIfDefined(json, SIZE, step.getSize());

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/metrics/FSNamesystemMBean.java Wed Oct 30 22:21:59 2013
@@ -130,4 +130,19 @@ public interface FSNamesystemMBean {
    * @return number of decommissioned dead data nodes
    */
   public int getNumDecomDeadDataNodes();
+
+  /**
+   * Number of data nodes that are in the decommissioning state
+   */
+  public int getNumDecommissioningDataNodes();
+
+  /**
+   * The statistics of snapshots
+   */
+  public String getSnapshotStats();
+
+  /**
+   * Return the maximum number of inodes in the file system
+   */
+  public long getMaxObjects();
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/SnapshotManager.java Wed Oct 30 22:21:59 2013
@@ -357,6 +357,10 @@ public class SnapshotManager implements 
     
     return snapshotRoot.computeDiff(from, to);
   }
+  
+  public void clearSnapshottableDirs() {
+    snapshottables.clear();
+  }
 
   /**
    * Returns the maximum allowable snapshot ID based on the bit width of the

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DisallowedDatanodeException.java Wed Oct 30 22:21:59 2013
@@ -37,7 +37,12 @@ public class DisallowedDatanodeException
   /** for java.io.Serializable */
   private static final long serialVersionUID = 1L;
 
+  public DisallowedDatanodeException(DatanodeID nodeID, String reason) {
+    super("Datanode denied communication with namenode because "
+        + reason + ": " + nodeID);
+  }
+
   public DisallowedDatanodeException(DatanodeID nodeID) {
-    super("Datanode denied communication with namenode: " + nodeID);
+    this(nodeID, "the host is not in the include-list");
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DelegationTokenFetcher.java Wed Oct 30 22:21:59 2013
@@ -41,18 +41,19 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
-import org.apache.hadoop.hdfs.HftpFileSystem;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
 import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
 import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
+import org.apache.hadoop.hdfs.web.HftpFileSystem;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.Credentials;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.util.ExitUtil;
 import org.apache.hadoop.util.GenericOptionsParser;
 
 import com.google.common.base.Charsets;
@@ -86,7 +87,7 @@ public class DelegationTokenFetcher {
     err.println("  --print             Print the delegation token");
     err.println();
     GenericOptionsParser.printGenericCommandUsage(err);
-    System.exit(1);
+    ExitUtil.terminate(1);    
   }
 
   private static Collection<Token<?>> readTokens(Path file, Configuration conf)

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/JMXGet.java Wed Oct 30 22:21:59 2013
@@ -43,6 +43,7 @@ import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.util.ExitUtil;
 
 /**
  * tool to get data from NameNode or DataNode using MBeans currently the
@@ -295,7 +296,7 @@ public class JMXGet {
       // invalid arguments
       err("Invalid args");
       printUsage(opts);
-      System.exit(-1);
+      ExitUtil.terminate(-1);      
     }
 
     JMXGet jm = new JMXGet();
@@ -317,7 +318,7 @@ public class JMXGet {
 
     if (commandLine.hasOption("help")) {
       printUsage(opts);
-      System.exit(0);
+      ExitUtil.terminate(0);
     }
 
     // rest of args
@@ -342,6 +343,6 @@ public class JMXGet {
       res = -1;
     }
 
-    System.exit(res);
+    ExitUtil.terminate(res);
   }
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/snapshot/SnapshotDiff.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/snapshot/SnapshotDiff.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/snapshot/SnapshotDiff.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/snapshot/SnapshotDiff.java Wed Oct 30 22:21:59 2013
@@ -20,12 +20,14 @@ package org.apache.hadoop.hdfs.tools.sna
 import java.io.IOException;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.protocol.HdfsConstants;
 import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
+import org.apache.hadoop.util.Tool;
+import org.apache.hadoop.util.ToolRunner;
 
 /**
  * A tool used to get the difference report between two snapshots, or between
@@ -38,7 +40,7 @@ import org.apache.hadoop.hdfs.protocol.S
  * </pre>
  */
 @InterfaceAudience.Private
-public class SnapshotDiff {
+public class SnapshotDiff extends Configured implements Tool {
   private static String getSnapshotName(String name) {
     if (Path.CUR_DIR.equals(name)) { // current directory
       return "";
@@ -57,7 +59,8 @@ public class SnapshotDiff {
     return name.substring(i + HdfsConstants.DOT_SNAPSHOT_DIR.length() + 1);
   }
   
-  public static void main(String[] argv) throws IOException {
+  @Override
+  public int run(String[] argv) throws Exception {
     String description = "SnapshotDiff <snapshotDir> <from> <to>:\n" +
     "\tGet the difference between two snapshots, \n" + 
     "\tor between a snapshot and the current tree of a directory.\n" +
@@ -67,15 +70,14 @@ public class SnapshotDiff {
     
     if(argv.length != 3) {
       System.err.println("Usage: \n" + description);
-      System.exit(1);
+      return 1;
     }
     
-    Configuration conf = new Configuration();
-    FileSystem fs = FileSystem.get(conf);
+    FileSystem fs = FileSystem.get(getConf());
     if (! (fs instanceof DistributedFileSystem)) {
       System.err.println(
           "SnapshotDiff can only be used in DistributedFileSystem");
-      System.exit(1);
+      return 1;
     }
     DistributedFileSystem dfs = (DistributedFileSystem) fs;
     
@@ -89,7 +91,14 @@ public class SnapshotDiff {
     } catch (IOException e) {
       String[] content = e.getLocalizedMessage().split("\n");
       System.err.println("snapshotDiff: " + content[0]);
+      return 1;
     }
+    return 0;
+  }
+
+  public static void main(String[] argv) throws Exception {
+    int rc = ToolRunner.run(new SnapshotDiff(), argv);
+    System.exit(rc);
   }
 
 }

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/LightWeightHashSet.java Wed Oct 30 22:21:59 2013
@@ -88,7 +88,7 @@ public class LightWeightHashSet<T> imple
    *
    * @see ConcurrentModificationException
    */
-  protected volatile int modification = 0;
+  protected int modification = 0;
 
   private float maxLoadFactor;
   private float minLoadFactor;
@@ -634,4 +634,4 @@ public class LightWeightHashSet<T> imple
   public boolean retainAll(Collection<?> c) {
     throw new UnsupportedOperationException("retainAll is not supported.");
   }
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/JsonUtil.java Wed Oct 30 22:21:59 2013
@@ -17,29 +17,11 @@
  */
 package org.apache.hadoop.hdfs.web;
 
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeMap;
-
-import org.apache.hadoop.fs.ContentSummary;
-import org.apache.hadoop.fs.FileChecksum;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.MD5MD5CRC32CastagnoliFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32GzipFileChecksum;
-import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
+import org.apache.hadoop.fs.*;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.hdfs.DFSUtil;
-import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.*;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
-import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
-import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
-import org.apache.hadoop.hdfs.protocol.LocatedBlock;
-import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
 import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.server.namenode.INodeId;
@@ -50,6 +32,11 @@ import org.apache.hadoop.util.DataChecks
 import org.apache.hadoop.util.StringUtils;
 import org.mortbay.util.ajax.JSON;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.*;
+
 /** JSON Utilities */
 public class JsonUtil {
   private static final Object[] EMPTY_OBJECT_ARRAY = {};
@@ -295,6 +282,7 @@ public class JsonUtil {
     m.put("storageID", datanodeinfo.getStorageID());
     m.put("xferPort", datanodeinfo.getXferPort());
     m.put("infoPort", datanodeinfo.getInfoPort());
+    m.put("infoSecurePort", datanodeinfo.getInfoSecurePort());
     m.put("ipcPort", datanodeinfo.getIpcPort());
 
     m.put("capacity", datanodeinfo.getCapacity());
@@ -309,10 +297,15 @@ public class JsonUtil {
   }
 
   /** Convert a Json map to an DatanodeInfo object. */
-  private static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
+  static DatanodeInfo toDatanodeInfo(final Map<?, ?> m) {
     if (m == null) {
       return null;
     }
+    
+    Object infoSecurePort = m.get("infoSecurePort");
+    if (infoSecurePort == null) {
+      infoSecurePort = 0l; // same as the default value in hdfs.proto
+    }
 
     return new DatanodeInfo(
         (String)m.get("ipAddr"),
@@ -320,6 +313,7 @@ public class JsonUtil {
         (String)m.get("storageID"),
         (int)(long)(Long)m.get("xferPort"),
         (int)(long)(Long)m.get("infoPort"),
+        (int)(long)(Long)infoSecurePort,
         (int)(long)(Long)m.get("ipcPort"),
 
         (Long)m.get("capacity"),

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/web/WebHdfsFileSystem.java Wed Oct 30 22:21:59 2013
@@ -51,9 +51,9 @@ import org.apache.hadoop.fs.MD5MD5CRC32F
 import org.apache.hadoop.fs.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.permission.FsPermission;
-import org.apache.hadoop.hdfs.ByteRangeInputStream;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSUtil;
+import org.apache.hadoop.hdfs.HAUtil;
 import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
 import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSelector;
@@ -86,16 +86,14 @@ import org.apache.hadoop.hdfs.web.resour
 import org.apache.hadoop.hdfs.web.resources.TokenArgumentParam;
 import org.apache.hadoop.hdfs.web.resources.UserParam;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.retry.RetryPolicies;
 import org.apache.hadoop.io.retry.RetryPolicy;
 import org.apache.hadoop.io.retry.RetryUtils;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authentication.client.AuthenticatedURL;
 import org.apache.hadoop.security.authentication.client.AuthenticationException;
-import org.apache.hadoop.security.authorize.AuthorizationException;
-import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
 import org.apache.hadoop.security.token.SecretManager.InvalidToken;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -119,18 +117,9 @@ public class WebHdfsFileSystem extends F
   /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
   public static final String PATH_PREFIX = "/" + SCHEME + "/v" + VERSION;
 
-  /** SPNEGO authenticator */
-  private static final KerberosUgiAuthenticator AUTH = new KerberosUgiAuthenticator();
-  /** Configures connections for AuthenticatedURL */
-  private static final ConnectionConfigurator CONN_CONFIGURATOR =
-    new ConnectionConfigurator() {
-      @Override
-      public HttpURLConnection configure(HttpURLConnection conn)
-          throws IOException {
-        URLUtils.setTimeouts(conn);
-        return conn;
-      }
-    };
+  /** Default connection factory may be overridden in tests to use smaller timeout values */
+  URLConnectionFactory connectionFactory = URLConnectionFactory.DEFAULT_CONNECTION_FACTORY;
+
   /** Delegation token kind */
   public static final Text TOKEN_KIND = new Text("WEBHDFS delegation");
   /** Token selector */
@@ -159,12 +148,13 @@ public class WebHdfsFileSystem extends F
   }
 
   private UserGroupInformation ugi;
-  private InetSocketAddress nnAddr;
   private URI uri;
   private boolean hasInitedToken;
   private Token<?> delegationToken;
   private RetryPolicy retryPolicy = null;
   private Path workingDir;
+  private InetSocketAddress nnAddrs[];
+  private int currentNNAddrIndex;
 
   /**
    * Return the protocol scheme for the FileSystem.
@@ -174,7 +164,7 @@ public class WebHdfsFileSystem extends F
    */
   @Override
   public String getScheme() {
-    return "webhdfs";
+    return SCHEME;
   }
 
   @Override
@@ -183,20 +173,42 @@ public class WebHdfsFileSystem extends F
     super.initialize(uri, conf);
     setConf(conf);
     ugi = UserGroupInformation.getCurrentUser();
+
     try {
-      this.uri = new URI(uri.getScheme(), uri.getAuthority(), null, null, null);
+      this.uri = new URI(uri.getScheme(), uri.getAuthority(), null,
+          null, null);
+      this.nnAddrs = DFSUtil.resolve(this.uri, getDefaultPort(), conf);
     } catch (URISyntaxException e) {
       throw new IllegalArgumentException(e);
     }
-    this.nnAddr = NetUtils.createSocketAddr(uri.getAuthority(), getDefaultPort());
-    this.retryPolicy = 
-        RetryUtils.getDefaultRetryPolicy(
-            conf, 
-            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, 
-            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_DEFAULT, 
-            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_KEY,
-            DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
-            SafeModeException.class);
+
+    if (!HAUtil.isLogicalUri(conf, this.uri)) {
+      this.retryPolicy =
+          RetryUtils.getDefaultRetryPolicy(
+              conf,
+              DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_KEY,
+              DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_ENABLED_DEFAULT,
+              DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_KEY,
+              DFSConfigKeys.DFS_HTTP_CLIENT_RETRY_POLICY_SPEC_DEFAULT,
+              SafeModeException.class);
+    } else {
+
+      int maxFailoverAttempts = conf.getInt(
+          DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_KEY,
+          DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_MAX_ATTEMPTS_DEFAULT);
+      int failoverSleepBaseMillis = conf.getInt(
+          DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_KEY,
+          DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_BASE_DEFAULT);
+      int failoverSleepMaxMillis = conf.getInt(
+          DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_KEY,
+          DFSConfigKeys.DFS_HTTP_CLIENT_FAILOVER_SLEEPTIME_MAX_DEFAULT);
+
+      this.retryPolicy = RetryPolicies
+          .failoverOnNetworkException(RetryPolicies.TRY_ONCE_THEN_FAIL,
+              maxFailoverAttempts, failoverSleepBaseMillis,
+              failoverSleepMaxMillis);
+    }
+
     this.workingDir = getHomeDirectory();
 
     if (UserGroupInformation.isSecurityEnabled()) {
@@ -348,6 +360,19 @@ public class WebHdfsFileSystem extends F
     return ((RemoteException)ioe).unwrapRemoteException();
   }
 
+  private synchronized InetSocketAddress getCurrentNNAddr() {
+    return nnAddrs[currentNNAddrIndex];
+  }
+
+  /**
+   * Reset the appropriate state to gracefully fail over to another name node
+   */
+  private synchronized void resetStateToFailOver() {
+    currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;
+    delegationToken = null;
+    hasInitedToken = false;
+  }
+
   /**
    * Return a URL pointing to given path on the namenode.
    *
@@ -357,6 +382,7 @@ public class WebHdfsFileSystem extends F
    * @throws IOException on error constructing the URL
    */
   private URL getNamenodeURL(String path, String query) throws IOException {
+    InetSocketAddress nnAddr = getCurrentNNAddr();
     final URL url = new URL("http", nnAddr.getHostName(),
           nnAddr.getPort(), path + '?' + query);
     if (LOG.isTraceEnabled()) {
@@ -414,38 +440,28 @@ public class WebHdfsFileSystem extends F
    */
   private Map<?, ?> run(final HttpOpParam.Op op, final Path fspath,
       final Param<?,?>... parameters) throws IOException {
-    return new Runner(op, fspath, parameters).run().json;
+    return new FsPathRunner(op, fspath, parameters).run().json;
   }
 
   /**
    * This class is for initialing a HTTP connection, connecting to server,
    * obtaining a response, and also handling retry on failures.
    */
-  class Runner {
-    private final HttpOpParam.Op op;
-    private final URL url;
+  abstract class AbstractRunner {
+    abstract protected URL getUrl() throws IOException;
+
+    protected final HttpOpParam.Op op;
     private final boolean redirected;
 
     private boolean checkRetry;
-    private HttpURLConnection conn = null;
+    protected HttpURLConnection conn = null;
     private Map<?, ?> json = null;
 
-    Runner(final HttpOpParam.Op op, final URL url, final boolean redirected) {
+    protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
       this.op = op;
-      this.url = url;
       this.redirected = redirected;
     }
 
-    Runner(final HttpOpParam.Op op, final Path fspath,
-        final Param<?,?>... parameters) throws IOException {
-      this(op, toUrl(op, fspath, parameters), false);
-    }
-
-    Runner(final HttpOpParam.Op op, final HttpURLConnection conn) {
-      this(op, null, false);
-      this.conn = conn;
-    }
-
     private HttpURLConnection getHttpUrlConnection(final URL url)
         throws IOException, AuthenticationException {
       UserGroupInformation connectUgi = ugi.getRealUser();
@@ -475,17 +491,7 @@ public class WebHdfsFileSystem extends F
         throws IOException {
       final HttpURLConnection conn;
       try {
-        if (op.getRequireAuth()) {
-          LOG.debug("open AuthenticatedURL connection");
-          UserGroupInformation.getCurrentUser().checkTGTAndReloginFromKeytab();
-          final AuthenticatedURL.Token authToken = new AuthenticatedURL.Token();
-          conn = new AuthenticatedURL(AUTH, CONN_CONFIGURATOR).openConnection(
-            url, authToken);
-          URLUtils.setTimeouts(conn);
-        } else {
-          LOG.debug("open URL connection");
-          conn = (HttpURLConnection)URLUtils.openConnection(url);
-        }
+        conn = (HttpURLConnection) connectionFactory.openConnection(op, url);
       } catch (AuthenticationException e) {
         throw new IOException(e);
       }
@@ -494,6 +500,7 @@ public class WebHdfsFileSystem extends F
   
     private void init() throws IOException {
       checkRetry = !redirected;
+      URL url = getUrl();
       try {
         conn = getHttpUrlConnection(url);
       } catch(AuthenticationException ae) {
@@ -520,7 +527,23 @@ public class WebHdfsFileSystem extends F
       }
     }
 
-    Runner run() throws IOException {
+    AbstractRunner run() throws IOException {
+      /**
+       * Do the real work.
+       *
+       * There are three cases that the code inside the loop can throw an
+       * IOException:
+       *
+       * <ul>
+       * <li>The connection has failed (e.g., ConnectException,
+       * @see FailoverOnNetworkExceptionRetry for more details)</li>
+       * <li>The namenode enters the standby state (i.e., StandbyException).</li>
+       * <li>The server returns errors for the command (i.e., RemoteException)</li>
+       * </ul>
+       *
+       * The call to shouldRetry() will conduct the retry policy. The policy
+       * examines the exception and swallows it if it decides to rerun the work.
+       */
       for(int retry = 0; ; retry++) {
         try {
           init();
@@ -538,14 +561,25 @@ public class WebHdfsFileSystem extends F
 
     private void shouldRetry(final IOException ioe, final int retry
         ) throws IOException {
+      InetSocketAddress nnAddr = getCurrentNNAddr();
       if (checkRetry) {
         try {
           final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
               ioe, retry, 0, true);
-          if (a.action == RetryPolicy.RetryAction.RetryDecision.RETRY) {
+
+          boolean isRetry = a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
+          boolean isFailoverAndRetry =
+              a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
+
+          if (isRetry || isFailoverAndRetry) {
             LOG.info("Retrying connect to namenode: " + nnAddr
                 + ". Already tried " + retry + " time(s); retry policy is "
-                + retryPolicy + ", delay " + a.delayMillis + "ms.");      
+                + retryPolicy + ", delay " + a.delayMillis + "ms.");
+
+            if (isFailoverAndRetry) {
+              resetStateToFailOver();
+            }
+
             Thread.sleep(a.delayMillis);
             return;
           }
@@ -579,8 +613,10 @@ public class WebHdfsFileSystem extends F
       checkRetry = false;
       
       //Step 2) Submit another Http request with the URL from the Location header with data.
-      conn = (HttpURLConnection)URLUtils.openConnection(new URL(redirect));
-      conn.setRequestProperty("Content-Type", MediaType.APPLICATION_OCTET_STREAM);
+      conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
+          redirect));
+      conn.setRequestProperty("Content-Type",
+          MediaType.APPLICATION_OCTET_STREAM);
       conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
       connect();
       return conn;
@@ -602,7 +638,8 @@ public class WebHdfsFileSystem extends F
           disconnect();
   
           checkRetry = false;
-          conn = (HttpURLConnection)URLUtils.openConnection(new URL(redirect));
+          conn = (HttpURLConnection) connectionFactory.openConnection(new URL(
+              redirect));
           connect();
         }
 
@@ -618,6 +655,48 @@ public class WebHdfsFileSystem extends F
     }
   }
 
+  final class FsPathRunner extends AbstractRunner {
+    private final Path fspath;
+    private final Param<?, ?>[] parameters;
+
+    FsPathRunner(final HttpOpParam.Op op, final Path fspath, final Param<?,?>... parameters) {
+      super(op, false);
+      this.fspath = fspath;
+      this.parameters = parameters;
+    }
+
+    @Override
+    protected URL getUrl() throws IOException {
+      return toUrl(op, fspath, parameters);
+    }
+  }
+
+  final class URLRunner extends AbstractRunner {
+    private final URL url;
+    @Override
+    protected URL getUrl() {
+      return url;
+    }
+
+    protected URLRunner(final HttpOpParam.Op op, final URL url, boolean redirected) {
+      super(op, redirected);
+      this.url = url;
+    }
+  }
+
+  @VisibleForTesting
+  final class ConnRunner extends AbstractRunner {
+    protected ConnRunner(final HttpOpParam.Op op, HttpURLConnection conn) {
+      super(op, false);
+      this.conn = conn;
+    }
+
+    @Override
+    protected URL getUrl() {
+      return null;
+    }
+  }
+
   private FsPermission applyUMask(FsPermission permission) {
     if (permission == null) {
       permission = FsPermission.getDefault();
@@ -773,7 +852,7 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PutOpParam.Op.CREATE;
-    return new Runner(op, f, 
+    return new FsPathRunner(op, f,
         new PermissionParam(applyUMask(permission)),
         new OverwriteParam(overwrite),
         new BufferSizeParam(bufferSize),
@@ -789,17 +868,11 @@ public class WebHdfsFileSystem extends F
     statistics.incrementWriteOps(1);
 
     final HttpOpParam.Op op = PostOpParam.Op.APPEND;
-    return new Runner(op, f, new BufferSizeParam(bufferSize))
+    return new FsPathRunner(op, f, new BufferSizeParam(bufferSize))
       .run()
       .write(bufferSize);
   }
 
-  @SuppressWarnings("deprecation")
-  @Override
-  public boolean delete(final Path f) throws IOException {
-    return delete(f, true);
-  }
-
   @Override
   public boolean delete(Path f, boolean recursive) throws IOException {
     final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
@@ -836,7 +909,7 @@ public class WebHdfsFileSystem extends F
         final boolean resolved) throws IOException {
       final URL offsetUrl = offset == 0L? url
           : new URL(url + "&" + new OffsetParam(offset));
-      return new Runner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
+      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved).run().conn;
     }  
   }
 
@@ -910,7 +983,7 @@ public class WebHdfsFileSystem extends F
     final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
     final Map<?, ?> m = run(op, null, new RenewerParam(renewer));
     final Token<DelegationTokenIdentifier> token = JsonUtil.toDelegationToken(m); 
-    SecurityUtil.setTokenService(token, nnAddr);
+    SecurityUtil.setTokenService(token, getCurrentNNAddr());
     return token;
   }
 

Propchange: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/
------------------------------------------------------------------------------
  Merged /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1531125
  Merged /hadoop/common/branches/branch-2/hadoop-hdfs-project/hadoop-hdfs/src/main/native:r1513206-1537326

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/expect.h Wed Oct 30 22:21:59 2013
@@ -19,16 +19,19 @@
 #ifndef LIBHDFS_NATIVE_TESTS_EXPECT_H
 #define LIBHDFS_NATIVE_TESTS_EXPECT_H
 
+#include <inttypes.h>
 #include <stdio.h>
 
+struct hdfsFile_internal;
+
 #define EXPECT_ZERO(x) \
     do { \
         int __my_ret__ = x; \
         if (__my_ret__) { \
             int __my_errno__ = errno; \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
 		    "code %d (errno: %d): got nonzero from %s\n", \
-		    __LINE__, __my_ret__, __my_errno__, #x); \
+		    __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
             return __my_ret__; \
         } \
     } while (0);
@@ -38,9 +41,9 @@
         void* __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ != NULL) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
 		    "got non-NULL value %p from %s\n", \
-		    __LINE__, __my_errno__, __my_ret__, #x); \
+		    __FILE__, __LINE__, __my_errno__, __my_ret__, #x); \
             return -1; \
         } \
     } while (0);
@@ -50,8 +53,8 @@
         void* __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ == NULL) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d (errno: %d): " \
-		    "got NULL from %s\n", __LINE__, __my_errno__, #x); \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d (errno: %d): " \
+		    "got NULL from %s\n", __FILE__, __LINE__, __my_errno__, #x); \
             return -1; \
         } \
     } while (0);
@@ -61,15 +64,16 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ != -1) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
-                "code %d (errno: %d): expected -1 from %s\n", __LINE__, \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+                "code %d (errno: %d): expected -1 from %s\n", \
+                    __FILE__, __LINE__, \
                 __my_ret__, __my_errno__, #x); \
             return -1; \
         } \
         if (__my_errno__ != e) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
                 "code %d (errno: %d): expected errno = %d from %s\n", \
-                __LINE__, __my_ret__, __my_errno__, e, #x); \
+                __FILE__, __LINE__, __my_ret__, __my_errno__, e, #x); \
             return -1; \
 	} \
     } while (0);
@@ -79,9 +83,9 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (!__my_ret__) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
-		    "code %d (errno: %d): got zero from %s\n", __LINE__, \
-                __my_ret__, __my_errno__, #x); \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+              "code %d (errno: %d): got zero from %s\n", __FILE__, __LINE__, \
+              __my_ret__, __my_errno__, #x); \
             return -1; \
         } \
     } while (0);
@@ -91,9 +95,9 @@
         int __my_ret__ = x; \
         int __my_errno__ = errno; \
         if (__my_ret__ < 0) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
                 "code %d (errno: %d): got negative return from %s\n", \
-		    __LINE__, __my_ret__, __my_errno__, #x); \
+                __FILE__, __LINE__, __my_ret__, __my_errno__, #x); \
             return __my_ret__; \
         } \
     } while (0);
@@ -103,9 +107,21 @@
         int __my_ret__ = y; \
         int __my_errno__ = errno; \
         if (__my_ret__ != (x)) { \
-            fprintf(stderr, "TEST_ERROR: failed on line %d with return " \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
               "code %d (errno: %d): expected %d\n", \
-               __LINE__, __my_ret__, __my_errno__, (x)); \
+               __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
+            return -1; \
+        } \
+    } while (0);
+
+#define EXPECT_INT64_EQ(x, y) \
+    do { \
+        int64_t __my_ret__ = y; \
+        int __my_errno__ = errno; \
+        if (__my_ret__ != (x)) { \
+            fprintf(stderr, "TEST_ERROR: failed on %s:%d with return " \
+              "value %"PRId64" (errno: %d): expected %"PRId64"\n", \
+               __FILE__, __LINE__, __my_ret__, __my_errno__, (x)); \
             return -1; \
         } \
     } while (0);
@@ -117,4 +133,17 @@
     ret = -errno; \
     } while (ret == -EINTR);
 
+/**
+ * Test that an HDFS file has the given statistics.
+ *
+ * Any parameter can be set to UINT64_MAX to avoid checking it.
+ *
+ * @return 0 on success; error code otherwise
+ */
+int expectFileStats(struct hdfsFile_internal *file,
+      uint64_t expectedTotalBytesRead,
+      uint64_t expectedTotalLocalBytesRead,
+      uint64_t expectedTotalShortCircuitBytesRead,
+      uint64_t expectedTotalZeroCopyBytesRead);
+
 #endif

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.c Wed Oct 30 22:21:59 2013
@@ -39,6 +39,7 @@
 #define JAVA_NET_ISA    "java/net/InetSocketAddress"
 #define JAVA_NET_URI    "java/net/URI"
 #define JAVA_STRING     "java/lang/String"
+#define READ_OPTION     "org/apache/hadoop/fs/ReadOption"
 
 #define JAVA_VOID       "V"
 
@@ -143,6 +144,15 @@ int hdfsFileGetReadStatistics(hdfsFile f
         goto done;
     }
     s->totalShortCircuitBytesRead = jVal.j;
+    jthr = invokeMethod(env, &jVal, INSTANCE, readStats,
+                  "org/apache/hadoop/hdfs/DFSInputStream$ReadStatistics",
+                  "getTotalZeroCopyBytesRead", "()J");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hdfsFileGetReadStatistics: getTotalZeroCopyBytesRead failed");
+        goto done;
+    }
+    s->totalZeroCopyBytesRead = jVal.j;
     *stats = s;
     s = NULL;
     ret = 0;
@@ -183,6 +193,25 @@ void hdfsFileDisableDirectRead(hdfsFile 
     file->flags &= ~HDFS_FILE_SUPPORTS_DIRECT_READ;
 }
 
+int hdfsDisableDomainSocketSecurity(void)
+{
+    jthrowable jthr;
+    JNIEnv* env = getJNIEnv();
+    if (env == NULL) {
+      errno = EINTERNAL;
+      return -1;
+    }
+    jthr = invokeMethod(env, NULL, STATIC, NULL,
+            "org/apache/hadoop/net/unix/DomainSocket",
+            "disableBindPathValidation", "()V");
+    if (jthr) {
+        errno = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "DomainSocket#disableBindPathValidation");
+        return -1;
+    }
+    return 0;
+}
+
 /**
  * hdfsJniEnv: A wrapper struct to be used as 'value'
  * while saving thread -> JNIEnv* mappings
@@ -220,40 +249,6 @@ static jthrowable constructNewObjectOfPa
     return NULL;
 }
 
-/**
- * Set a configuration value.
- *
- * @param env               The JNI environment
- * @param jConfiguration    The configuration object to modify
- * @param key               The key to modify
- * @param value             The value to set the key to
- *
- * @return                  NULL on success; exception otherwise
- */
-static jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
-        const char *key, const char *value)
-{
-    jthrowable jthr;
-    jstring jkey = NULL, jvalue = NULL;
-
-    jthr = newJavaStr(env, key, &jkey);
-    if (jthr)
-        goto done;
-    jthr = newJavaStr(env, value, &jvalue);
-    if (jthr)
-        goto done;
-    jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
-            HADOOP_CONF, "set", JMETHOD2(JPARAM(JAVA_STRING),
-                                         JPARAM(JAVA_STRING), JAVA_VOID),
-            jkey, jvalue);
-    if (jthr)
-        goto done;
-done:
-    destroyLocalReference(env, jkey);
-    destroyLocalReference(env, jvalue);
-    return jthr;
-}
-
 static jthrowable hadoopConfGetStr(JNIEnv *env, jobject jConfiguration,
         const char *key, char **val)
 {
@@ -2108,6 +2103,395 @@ int hdfsUtime(hdfsFS fs, const char* pat
     return 0;
 }
 
+/**
+ * Zero-copy options.
+ *
+ * We cache the EnumSet of ReadOptions which has to be passed into every
+ * readZero call, to avoid reconstructing it each time.  This cache is cleared
+ * whenever an element changes.
+ */
+struct hadoopRzOptions
+{
+    JNIEnv *env;
+    int skipChecksums;
+    jobject byteBufferPool;
+    jobject cachedEnumSet;
+};
+
+struct hadoopRzOptions *hadoopRzOptionsAlloc(void)
+{
+    struct hadoopRzOptions *opts;
+    JNIEnv *env;
+
+    env = getJNIEnv();
+    if (!env) {
+        // Check to make sure the JNI environment is set up properly.
+        errno = EINTERNAL;
+        return NULL;
+    }
+    opts = calloc(1, sizeof(struct hadoopRzOptions));
+    if (!opts) {
+        errno = ENOMEM;
+        return NULL;
+    }
+    return opts;
+}
+
+static void hadoopRzOptionsClearCached(JNIEnv *env,
+        struct hadoopRzOptions *opts)
+{
+    if (!opts->cachedEnumSet) {
+        return;
+    }
+    (*env)->DeleteGlobalRef(env, opts->cachedEnumSet);
+    opts->cachedEnumSet = NULL;
+}
+
+int hadoopRzOptionsSetSkipChecksum(
+        struct hadoopRzOptions *opts, int skip)
+{
+    JNIEnv *env;
+    env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return -1;
+    }
+    hadoopRzOptionsClearCached(env, opts);
+    opts->skipChecksums = !!skip;
+    return 0;
+}
+
+int hadoopRzOptionsSetByteBufferPool(
+        struct hadoopRzOptions *opts, const char *className)
+{
+    JNIEnv *env;
+    jthrowable jthr;
+    jobject byteBufferPool = NULL;
+
+    env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return -1;
+    }
+
+    // Note: we don't have to call hadoopRzOptionsClearCached in this
+    // function, since the ByteBufferPool is passed separately from the
+    // EnumSet of ReadOptions.
+
+    jthr = constructNewObjectOfClass(env, &byteBufferPool, className, "()V");
+    if (jthr) {
+        printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopRzOptionsSetByteBufferPool(className=%s): ", className);
+        errno = EINVAL;
+        return -1;
+    }
+    if (opts->byteBufferPool) {
+        // Delete any previous ByteBufferPool we had.
+        (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+    }
+    opts->byteBufferPool = byteBufferPool;
+    return 0;
+}
+
+void hadoopRzOptionsFree(struct hadoopRzOptions *opts)
+{
+    JNIEnv *env;
+    env = getJNIEnv();
+    if (!env) {
+        return;
+    }
+    hadoopRzOptionsClearCached(env, opts);
+    if (opts->byteBufferPool) {
+        (*env)->DeleteGlobalRef(env, opts->byteBufferPool);
+        opts->byteBufferPool = NULL;
+    }
+    free(opts);
+}
+
+struct hadoopRzBuffer
+{
+    jobject byteBuffer;
+    uint8_t *ptr;
+    int32_t length;
+    int direct;
+};
+
+static jthrowable hadoopRzOptionsGetEnumSet(JNIEnv *env,
+        struct hadoopRzOptions *opts, jobject *enumSet)
+{
+    jthrowable jthr = NULL;
+    jobject enumInst = NULL, enumSetObj = NULL;
+    jvalue jVal;
+
+    if (opts->cachedEnumSet) {
+        // If we cached the value, return it now.
+        *enumSet = opts->cachedEnumSet;
+        goto done;
+    }
+    if (opts->skipChecksums) {
+        jthr = fetchEnumInstance(env, READ_OPTION,
+                  "SKIP_CHECKSUMS", &enumInst);
+        if (jthr) {
+            goto done;
+        }
+        jthr = invokeMethod(env, &jVal, STATIC, NULL,
+                "java/util/EnumSet", "of",
+                "(Ljava/lang/Enum;)Ljava/util/EnumSet;", enumInst);
+        if (jthr) {
+            goto done;
+        }
+        enumSetObj = jVal.l;
+    } else {
+        jclass clazz = (*env)->FindClass(env, READ_OPTION);
+        if (!clazz) {
+            jthr = newRuntimeError(env, "failed "
+                    "to find class for %s", READ_OPTION);
+            goto done;
+        }
+        jthr = invokeMethod(env, &jVal, STATIC, NULL,
+                "java/util/EnumSet", "noneOf",
+                "(Ljava/lang/Class;)Ljava/util/EnumSet;", clazz);
+        enumSetObj = jVal.l;
+    }
+    // create global ref
+    opts->cachedEnumSet = (*env)->NewGlobalRef(env, enumSetObj);
+    if (!opts->cachedEnumSet) {
+        jthr = getPendingExceptionAndClear(env);
+        goto done;
+    }
+    *enumSet = opts->cachedEnumSet;
+    jthr = NULL;
+done:
+    (*env)->DeleteLocalRef(env, enumInst);
+    (*env)->DeleteLocalRef(env, enumSetObj);
+    return jthr;
+}
+
+static int hadoopReadZeroExtractBuffer(JNIEnv *env,
+        const struct hadoopRzOptions *opts, struct hadoopRzBuffer *buffer)
+{
+    int ret;
+    jthrowable jthr;
+    jvalue jVal;
+    uint8_t *directStart;
+    void *mallocBuf = NULL;
+    jint position;
+    jarray array = NULL;
+
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "remaining", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#remaining failed: ");
+        goto done;
+    }
+    buffer->length = jVal.i;
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "position", "()I");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#position failed: ");
+        goto done;
+    }
+    position = jVal.i;
+    directStart = (*env)->GetDirectBufferAddress(env, buffer->byteBuffer);
+    if (directStart) {
+        // Handle direct buffers.
+        buffer->ptr = directStart + position;
+        buffer->direct = 1;
+        ret = 0;
+        goto done;
+    }
+    // Handle indirect buffers.
+    // The JNI docs don't say that GetDirectBufferAddress throws any exceptions
+    // when it fails.  However, they also don't clearly say that it doesn't.  It
+    // seems safest to clear any pending exceptions here, to prevent problems on
+    // various JVMs.
+    (*env)->ExceptionClear(env);
+    if (!opts->byteBufferPool) {
+        fputs("hadoopReadZeroExtractBuffer: we read through the "
+                "zero-copy path, but failed to get the address of the buffer via "
+                "GetDirectBufferAddress.  Please make sure your JVM supports "
+                "GetDirectBufferAddress.\n", stderr);
+        ret = ENOTSUP;
+        goto done;
+    }
+    // Get the backing array object of this buffer.
+    jthr = invokeMethod(env, &jVal, INSTANCE, buffer->byteBuffer,
+                     "java/nio/ByteBuffer", "array", "()[B");
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: ByteBuffer#array failed: ");
+        goto done;
+    }
+    array = jVal.l;
+    if (!array) {
+        fputs("hadoopReadZeroExtractBuffer: ByteBuffer#array returned NULL.",
+              stderr);
+        ret = EIO;
+        goto done;
+    }
+    mallocBuf = malloc(buffer->length);
+    if (!mallocBuf) {
+        fprintf(stderr, "hadoopReadZeroExtractBuffer: failed to allocate %d bytes of memory\n",
+                buffer->length);
+        ret = ENOMEM;
+        goto done;
+    }
+    (*env)->GetByteArrayRegion(env, array, position, buffer->length, mallocBuf);
+    jthr = (*env)->ExceptionOccurred(env);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZeroExtractBuffer: GetByteArrayRegion failed: ");
+        goto done;
+    }
+    buffer->ptr = mallocBuf;
+    buffer->direct = 0;
+    ret = 0;
+
+done:
+    free(mallocBuf);
+    (*env)->DeleteLocalRef(env, array);
+    return ret;
+}
+
+static int translateZCRException(JNIEnv *env, jthrowable exc)
+{
+    int ret;
+    char *className = NULL;
+    jthrowable jthr = classNameOfObject(exc, env, &className);
+
+    if (jthr) {
+        fputs("hadoopReadZero: failed to get class name of "
+                "exception from read().\n", stderr);
+        destroyLocalReference(env, exc);
+        destroyLocalReference(env, jthr);
+        ret = EIO;
+        goto done;
+    }
+    if (!strcmp(className, "java.lang.UnsupportedOperationException")) {
+        ret = EPROTONOSUPPORT;
+        goto done;
+    }
+    ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+            "hadoopZeroCopyRead: ZeroCopyCursor#read failed");
+done:
+    free(className);
+    return ret;
+}
+
+struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+            struct hadoopRzOptions *opts, int32_t maxLength)
+{
+    JNIEnv *env;
+    jthrowable jthr = NULL;
+    jvalue jVal;
+    jobject enumSet = NULL, byteBuffer = NULL;
+    struct hadoopRzBuffer* buffer = NULL;
+    int ret;
+
+    env = getJNIEnv();
+    if (!env) {
+        errno = EINTERNAL;
+        return NULL;
+    }
+    if (file->type != INPUT) {
+        fputs("Cannot read from a non-InputStream object!\n", stderr);
+        ret = EINVAL;
+        goto done;
+    }
+    buffer = calloc(1, sizeof(struct hadoopRzBuffer));
+    if (!buffer) {
+        ret = ENOMEM;
+        goto done;
+    }
+    jthr = hadoopRzOptionsGetEnumSet(env, opts, &enumSet);
+    if (jthr) {
+        ret = printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                "hadoopReadZero: hadoopRzOptionsGetEnumSet failed: ");
+        goto done;
+    }
+    jthr = invokeMethod(env, &jVal, INSTANCE, file->file, HADOOP_ISTRM, "read",
+        "(Lorg/apache/hadoop/io/ByteBufferPool;ILjava/util/EnumSet;)"
+        "Ljava/nio/ByteBuffer;", opts->byteBufferPool, maxLength, enumSet);
+    if (jthr) {
+        ret = translateZCRException(env, jthr);
+        goto done;
+    }
+    byteBuffer = jVal.l;
+    if (!byteBuffer) {
+        buffer->byteBuffer = NULL;
+        buffer->length = 0;
+        buffer->ptr = NULL;
+    } else {
+        buffer->byteBuffer = (*env)->NewGlobalRef(env, byteBuffer);
+        if (!buffer->byteBuffer) {
+            ret = printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+                "hadoopReadZero: failed to create global ref to ByteBuffer");
+            goto done;
+        }
+        ret = hadoopReadZeroExtractBuffer(env, opts, buffer);
+        if (ret) {
+            goto done;
+        }
+    }
+    ret = 0;
+done:
+    (*env)->DeleteLocalRef(env, byteBuffer);
+    if (ret) {
+        if (buffer) {
+            if (buffer->byteBuffer) {
+                (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+            }
+            free(buffer);
+        }
+        errno = ret;
+        return NULL;
+    } else {
+        errno = 0;
+    }
+    return buffer;
+}
+
+int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer)
+{
+    return buffer->length;
+}
+
+const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer)
+{
+    return buffer->ptr;
+}
+
+void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer)
+{
+    jvalue jVal;
+    jthrowable jthr;
+    JNIEnv* env;
+    
+    env = getJNIEnv();
+    if (env == NULL) {
+        errno = EINTERNAL;
+        return;
+    }
+    if (buffer->byteBuffer) {
+        jthr = invokeMethod(env, &jVal, INSTANCE, file->file,
+                    HADOOP_ISTRM, "releaseBuffer",
+                    "(Ljava/nio/ByteBuffer;)V", buffer->byteBuffer);
+        if (jthr) {
+            printExceptionAndFree(env, jthr, PRINT_EXC_ALL,
+                    "hadoopRzBufferFree: releaseBuffer failed: ");
+            // even on error, we have to delete the reference.
+        }
+        (*env)->DeleteGlobalRef(env, buffer->byteBuffer);
+    }
+    if (!buffer->direct) {
+        free(buffer->ptr);
+    }
+    memset(buffer, 0, sizeof(*buffer));
+    free(buffer);
+}
+
 char***
 hdfsGetHosts(hdfsFS fs, const char* path, tOffset start, tOffset length)
 {

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs.h Wed Oct 30 22:21:59 2013
@@ -36,6 +36,8 @@
 #define EINTERNAL 255 
 #endif
 
+#define ELASTIC_BYTE_BUFFER_POOL_CLASS \
+  "org/apache/hadoop/io/ElasticByteBufferPool"
 
 /** All APIs set errno to meaningful values */
 
@@ -65,6 +67,10 @@ extern  "C" {
     struct hdfsFile_internal;
     typedef struct hdfsFile_internal* hdfsFile;
 
+    struct hadoopRzOptions;
+
+    struct hadoopRzBuffer;
+
     /**
      * Determine if a file is open for read.
      *
@@ -85,6 +91,7 @@ extern  "C" {
       uint64_t totalBytesRead;
       uint64_t totalLocalBytesRead;
       uint64_t totalShortCircuitBytesRead;
+      uint64_t totalZeroCopyBytesRead;
     };
 
     /**
@@ -680,7 +687,107 @@ extern  "C" {
      * @return 0 on success else -1
      */
     int hdfsUtime(hdfsFS fs, const char* path, tTime mtime, tTime atime);
-    
+
+    /**
+     * Allocate a zero-copy options structure.
+     *
+     * You must free all options structures allocated with this function using
+     * hadoopRzOptionsFree.
+     *
+     * @return            A zero-copy options structure, or NULL if one could
+     *                    not be allocated.  If NULL is returned, errno will
+     *                    contain the error number.
+     */
+    struct hadoopRzOptions *hadoopRzOptionsAlloc(void);
+
+    /**
+     * Determine whether we should skip checksums in read0.
+     *
+     * @param opts        The options structure.
+     * @param skip        Nonzero to skip checksums sometimes; zero to always
+     *                    check them.
+     *
+     * @return            0 on success; -1 plus errno on failure.
+     */
+    int hadoopRzOptionsSetSkipChecksum(
+            struct hadoopRzOptions *opts, int skip);
+
+    /**
+     * Set the ByteBufferPool to use with read0.
+     *
+     * @param opts        The options structure.
+     * @param className   If this is NULL, we will not use any
+     *                    ByteBufferPool.  If this is non-NULL, it will be
+     *                    treated as the name of the pool class to use.
+     *                    For example, you can use
+     *                    ELASTIC_BYTE_BUFFER_POOL_CLASS.
+     *
+     * @return            0 if the ByteBufferPool class was found and
+     *                    instantiated;
+     *                    -1 plus errno otherwise.
+     */
+    int hadoopRzOptionsSetByteBufferPool(
+            struct hadoopRzOptions *opts, const char *className);
+
+    /**
+     * Free a hadoopRzOptionsFree structure.
+     *
+     * @param opts        The options structure to free.
+     *                    Any associated ByteBufferPool will also be freed.
+     */
+    void hadoopRzOptionsFree(struct hadoopRzOptions *opts);
+
+    /**
+     * Perform a byte buffer read.
+     * If possible, this will be a zero-copy (mmap) read.
+     *
+     * @param file       The file to read from.
+     * @param opts       An options structure created by hadoopRzOptionsAlloc.
+     * @param maxLength  The maximum length to read.  We may read fewer bytes
+     *                   than this length.
+     *
+     * @return           On success, returns a new hadoopRzBuffer.
+     *                   This buffer will continue to be valid and readable
+     *                   until it is released by readZeroBufferFree.  Failure to
+     *                   release a buffer will lead to a memory leak.
+     *
+     *                   NULL plus an errno code on an error.
+     *                   errno = EOPNOTSUPP indicates that we could not do a
+     *                   zero-copy read, and there was no ByteBufferPool
+     *                   supplied.
+     */
+    struct hadoopRzBuffer* hadoopReadZero(hdfsFile file,
+            struct hadoopRzOptions *opts, int32_t maxLength);
+
+    /**
+     * Determine the length of the buffer returned from readZero.
+     *
+     * @param buffer     a buffer returned from readZero.
+     * @return           the length of the buffer.
+     */
+    int32_t hadoopRzBufferLength(const struct hadoopRzBuffer *buffer);
+
+    /**
+     * Get a pointer to the raw buffer returned from readZero.
+     *
+     * To find out how many bytes this buffer contains, call
+     * hadoopRzBufferLength.
+     *
+     * @param buffer     a buffer returned from readZero.
+     * @return           a pointer to the start of the buffer.  This will be
+     *                   NULL when end-of-file has been reached.
+     */
+    const void *hadoopRzBufferGet(const struct hadoopRzBuffer *buffer);
+
+    /**
+     * Release a buffer obtained through readZero.
+     *
+     * @param file       The hdfs stream that created this buffer.  This must be
+     *                   the same stream you called hadoopReadZero on.
+     * @param buffer     The buffer to release.
+     */
+    void hadoopRzBufferFree(hdfsFile file, struct hadoopRzBuffer *buffer);
+
 #ifdef __cplusplus
 }
 #endif

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/hdfs_test.h Wed Oct 30 22:21:59 2013
@@ -48,6 +48,15 @@ extern  "C" {
      * @param file     The HDFS file
      */
     void hdfsFileDisableDirectRead(struct hdfsFile_internal *file);
+
+    /**
+     * Disable domain socket security checks.
+     *
+     * @param          0 if domain socket security was disabled;
+     *                 -1 if not.
+     */
+    int hdfsDisableDomainSocketSecurity(void); 
+
 #ifdef __cplusplus
 }
 #endif

Modified: hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c
URL: http://svn.apache.org/viewvc/hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c?rev=1537330&r1=1537329&r2=1537330&view=diff
==============================================================================
--- hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c (original)
+++ hadoop/common/branches/YARN-321/hadoop-hdfs-project/hadoop-hdfs/src/main/native/libhdfs/jni_helper.c Wed Oct 30 22:21:59 2013
@@ -608,3 +608,73 @@ JNIEnv* getJNIEnv(void)
     return env;
 }
 
+int javaObjectIsOfClass(JNIEnv *env, jobject obj, const char *name)
+{
+    jclass clazz;
+    int ret;
+
+    clazz = (*env)->FindClass(env, name);
+    if (!clazz) {
+        printPendingExceptionAndFree(env, PRINT_EXC_ALL,
+            "javaObjectIsOfClass(%s)", name);
+        return -1;
+    }
+    ret = (*env)->IsInstanceOf(env, obj, clazz);
+    (*env)->DeleteLocalRef(env, clazz);
+    return ret == JNI_TRUE ? 1 : 0;
+}
+
+jthrowable hadoopConfSetStr(JNIEnv *env, jobject jConfiguration,
+        const char *key, const char *value)
+{
+    jthrowable jthr;
+    jstring jkey = NULL, jvalue = NULL;
+
+    jthr = newJavaStr(env, key, &jkey);
+    if (jthr)
+        goto done;
+    jthr = newJavaStr(env, value, &jvalue);
+    if (jthr)
+        goto done;
+    jthr = invokeMethod(env, NULL, INSTANCE, jConfiguration,
+            "org/apache/hadoop/conf/Configuration", "set", 
+            "(Ljava/lang/String;Ljava/lang/String;)V",
+            jkey, jvalue);
+    if (jthr)
+        goto done;
+done:
+    (*env)->DeleteLocalRef(env, jkey);
+    (*env)->DeleteLocalRef(env, jvalue);
+    return jthr;
+}
+
+jthrowable fetchEnumInstance(JNIEnv *env, const char *className,
+                         const char *valueName, jobject *out)
+{
+    jclass clazz;
+    jfieldID fieldId;
+    jobject jEnum;
+    char prettyClass[256];
+
+    clazz = (*env)->FindClass(env, className);
+    if (!clazz) {
+        return newRuntimeError(env, "fetchEnum(%s, %s): failed to find class.",
+                className, valueName);
+    }
+    if (snprintf(prettyClass, sizeof(prettyClass), "L%s;", className)
+          >= sizeof(prettyClass)) {
+        return newRuntimeError(env, "fetchEnum(%s, %s): class name too long.",
+                className, valueName);
+    }
+    fieldId = (*env)->GetStaticFieldID(env, clazz, valueName, prettyClass);
+    if (!fieldId) {
+        return getPendingExceptionAndClear(env);
+    }
+    jEnum = (*env)->GetStaticObjectField(env, clazz, fieldId);
+    if (!jEnum) {
+        return getPendingExceptionAndClear(env);
+    }
+    *out = jEnum;
+    return NULL;
+}
+