You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by su...@apache.org on 2011/09/02 23:41:49 UTC

svn commit: r1164720 - in /hadoop/common/branches/branch-0.20-security: ./ src/hdfs/org/apache/hadoop/hdfs/ src/hdfs/org/apache/hadoop/hdfs/protocol/ src/hdfs/org/apache/hadoop/hdfs/server/namenode/ src/test/org/apache/hadoop/hdfs/

Author: suresh
Date: Fri Sep  2 21:41:48 2011
New Revision: 1164720

URL: http://svn.apache.org/viewvc?rev=1164720&view=rev
Log:
Porting from 0.20-append branch - HDFS-630. Client can exclude specific nodes in the write pipeline. Contributed by Nicolas Spiegelberg.


Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1164720&r1=1164719&r2=1164720&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep  2 21:41:48 2011
@@ -15,7 +15,9 @@ Release 0.20.205.0 - unreleased
     HDFS-142. Blocks that are being written by a client are stored in the
     blocksBeingWritten directory. 
     (Dhruba Borthakur, Nicolas Spiegelberg, Todd Lipcon via dhruba)
-    
+
+    HDFS-630. Client can exclude specific nodes in the write pipeline.
+    (Nicolas Spiegelberg via dhruba)
 
   BUG FIXES
 

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java?rev=1164720&r1=1164719&r2=1164720&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/DFSClient.java Fri Sep  2 21:41:48 2011
@@ -88,7 +88,13 @@ public class DFSClient implements FSCons
   final int writePacketSize;
   private final FileSystem.Statistics stats;
   private int maxBlockAcquireFailures;
-    
+
+  /**
+   * We assume we're talking to another CDH server, which supports
+   * HDFS-630's addBlock method. If we get a RemoteException indicating
+   * it doesn't, we'll set this false and stop trying.
+   */
+  private volatile boolean serverSupportsHdfs630 = true;
  
   public static ClientProtocol createNamenode(Configuration conf) throws IOException {
     return createNamenode(NameNode.getAddress(conf), conf);
@@ -2330,6 +2336,7 @@ public class DFSClient implements FSCons
     private int packetSize = 0; // write packet size, including the header.
     private int chunksPerPacket = 0;
     private DatanodeInfo[] nodes = null; // list of targets for current block
+    private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
     private volatile boolean hasError = false;
     private volatile int errorIndex = 0;
     private volatile IOException lastException = null;
@@ -3030,7 +3037,9 @@ public class DFSClient implements FSCons
         success = false;
                 
         long startTime = System.currentTimeMillis();
-        lb = locateFollowingBlock(startTime);
+
+        DatanodeInfo[] excluded = excludedNodes.toArray(new DatanodeInfo[0]);
+        lb = locateFollowingBlock(startTime, excluded.length > 0 ? excluded : null);
         block = lb.getBlock();
         accessToken = lb.getBlockToken();
         nodes = lb.getLocations();
@@ -3044,6 +3053,11 @@ public class DFSClient implements FSCons
           LOG.info("Abandoning block " + block);
           namenode.abandonBlock(block, src, clientName);
 
+          if (errorIndex < nodes.length) {
+            LOG.debug("Excluding datanode " + nodes[errorIndex]);
+            excludedNodes.add(nodes[errorIndex]);
+          }
+
           // Connection failed.  Let's wait a little bit and retry
           retry = true;
           try {
@@ -3158,7 +3172,8 @@ public class DFSClient implements FSCons
       return result;
     }
   
-    private LocatedBlock locateFollowingBlock(long start
+    private LocatedBlock locateFollowingBlock(long start,
+                                              DatanodeInfo[] excludedNodes
                                               ) throws IOException {     
       int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
       long sleeptime = 400;
@@ -3166,7 +3181,11 @@ public class DFSClient implements FSCons
         long localstart = System.currentTimeMillis();
         while (true) {
           try {
-            return namenode.addBlock(src, clientName);
+            if (serverSupportsHdfs630) {
+              return namenode.addBlock(src, clientName, excludedNodes);
+            } else {
+              return namenode.addBlock(src, clientName);
+            }
           } catch (RemoteException e) {
             IOException ue = 
               e.unwrapRemoteException(FileNotFoundException.class,
@@ -3176,7 +3195,18 @@ public class DFSClient implements FSCons
             if (ue != e) { 
               throw ue; // no need to retry these exceptions
             }
-            
+
+            if (e.getMessage().startsWith(
+                  "java.io.IOException: java.lang.NoSuchMethodException: " +
+                  "org.apache.hadoop.hdfs.protocol.ClientProtocol.addBlock(" +
+                  "java.lang.String, java.lang.String, " +
+                  "[Lorg.apache.hadoop.hdfs.protocol.DatanodeInfo;)")) {
+              // We're talking to a server that doesn't implement HDFS-630.
+              // Mark that and try again
+              serverSupportsHdfs630 = false;
+              continue;
+            }
+
             if (NotReplicatedYetException.class.getName().
                 equals(e.getClassName())) {
 

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=1164720&r1=1164719&r2=1164720&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Fri Sep  2 21:41:48 2011
@@ -187,11 +187,27 @@ public interface ClientProtocol extends 
    * addBlock() allocates a new block and datanodes the block data
    * should be replicated to.
    * 
+   * @deprecated use the 3-arg form below
    * @return LocatedBlock allocated block information.
    */
   public LocatedBlock addBlock(String src, String clientName) throws IOException;
 
   /**
+   * A client that wants to write an additional block to the 
+   * indicated filename (which must currently be open for writing)
+   * should call addBlock().  
+   *
+   * addBlock() allocates a new block and datanodes the block data
+   * should be replicated to.
+   *
+   * @param excludedNodes a list of nodes that should not be allocated
+   * 
+   * @return LocatedBlock allocated block information.
+   */
+  public LocatedBlock addBlock(String src, String clientName,
+                               DatanodeInfo[] excludedNodes) throws IOException;
+
+  /**
    * The client is done writing data to the given filename, and would 
    * like to complete it.  
    *

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=1164720&r1=1164719&r2=1164720&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Fri Sep  2 21:41:48 2011
@@ -109,6 +109,7 @@ import org.apache.hadoop.metrics2.util.M
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -1407,6 +1408,15 @@ public class FSNamesystem implements FSC
   }
 
   /**
+   * Stub for old callers pre-HDFS-630
+   */
+  public LocatedBlock getAdditionalBlock(String src, 
+                                         String clientName
+                                         ) throws IOException {
+    return getAdditionalBlock(src, clientName, null);
+  }
+
+  /**
    * The client would like to obtain an additional block for the indicated
    * filename (which is being written-to).  Return an array that consists
    * of the block, plus a set of machines.  The first on this list should
@@ -1418,7 +1428,8 @@ public class FSNamesystem implements FSC
    * client to "try again later".
    */
   public LocatedBlock getAdditionalBlock(String src, 
-                                         String clientName
+                                         String clientName,
+                                         List<Node> excludedNodes
                                          ) throws IOException {
     long fileLength, blockSize;
     int replication;
@@ -1449,7 +1460,7 @@ public class FSNamesystem implements FSC
     // choose targets for the new block tobe allocated.
     DatanodeDescriptor targets[] = replicator.chooseTarget(replication,
                                                            clientNode,
-                                                           null,
+                                                           excludedNodes,
                                                            blockSize);
     if (targets.length < this.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +

Modified: hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=1164720&r1=1164719&r2=1164720&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/hdfs/org/apache/hadoop/hdfs/server/namenode/NameNode.java Fri Sep  2 21:41:48 2011
@@ -49,6 +49,8 @@ import org.apache.hadoop.util.StringUtil
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
+import org.apache.hadoop.net.Node;
+import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AuthorizationException;
 import org.apache.hadoop.security.authorize.ProxyUsers;
@@ -65,7 +67,11 @@ import java.io.*;
 import java.net.*;
 import java.security.PrivilegedExceptionAction;
 import java.util.Collection;
+import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
+
 import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
 
 /**********************************************************
@@ -588,12 +594,30 @@ public class NameNode implements ClientP
   }
 
   /**
+   * Stub for 0.20 clients that don't support HDFS-630
    */
   public LocatedBlock addBlock(String src, 
                                String clientName) throws IOException {
+    return addBlock(src, clientName, null);
+  }
+
+  public LocatedBlock addBlock(String src,
+                               String clientName,
+                               DatanodeInfo[] excludedNodes)
+    throws IOException {
+
+    List<Node> excludedNodeList = null;
+    if (excludedNodes != null) {
+      // We must copy here, since this list gets modified later on
+      // in ReplicationTargetChooser
+      excludedNodeList = new ArrayList<Node>(
+        Arrays.<Node>asList(excludedNodes));
+    }
+
     stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
                          +src+" for "+clientName);
-    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, clientName);
+    LocatedBlock locatedBlock = namesystem.getAdditionalBlock(
+      src, clientName, excludedNodeList);
     if (locatedBlock != null)
       myMetrics.incrNumAddBlockOps();
     return locatedBlock;

Modified: hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=1164720&r1=1164719&r2=1164720&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/test/org/apache/hadoop/hdfs/TestDFSClientRetries.java Fri Sep  2 21:41:48 2011
@@ -205,6 +205,13 @@ public class TestDFSClientRetries extend
     public LocatedBlock addBlock(String src, String clientName)
     throws IOException
     {
+      return addBlock(src, clientName, null);
+    }
+
+
+    public LocatedBlock addBlock(String src, String clientName,
+                                 DatanodeInfo[] excludedNode)
+      throws IOException {
       num_calls++;
       if (num_calls > num_calls_allowed) { 
         throw new IOException("addBlock called more times than "
@@ -218,7 +225,6 @@ public class TestDFSClientRetries extend
     
     
     // The following methods are stub methods that are not needed by this mock class
-
     public LocatedBlocks  getBlockLocations(String src, long offset, long length) throws IOException { return null; }
 
     public void create(String src, FsPermission masked, String clientName, boolean overwrite, short replication, long blockSize) throws IOException {}