You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/24 06:56:27 UTC

svn commit: r883596 - in /hadoop/hdfs/trunk: ./ src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/ src/java/org/apache/hadoop/hdfs/server/namenode/ src/test/hdfs/org/apache/hadoop/hdfs/

Author: stack
Date: Tue Nov 24 05:56:24 2009
New Revision: 883596

URL: http://svn.apache.org/viewvc?rev=883596&view=rev
Log:
HBASE-630 In DFSOutputStream.nextBlockOutputStream(), the client can exclude specific datanodes when locating the next block

Added:
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
Modified:
    hadoop/hdfs/trunk/CHANGES.txt
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
    hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
    hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java

Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 24 05:56:24 2009
@@ -28,6 +28,10 @@
 
     HDFS-699. Add unit tests framework (Mockito) (cos, Eli Collins)
 
+    HDFS-630  In DFSOutputStream.nextBlockOutputStream(), the client can
+              exclude specific datanodes when locating the next block
+              (Cosmin Lehene via Stack)
+
   OPTIMIZATIONS
 
   BUG FIXES

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Nov 24 05:56:24 2009
@@ -2585,6 +2585,7 @@
       private DataInputStream blockReplyStream;
       private ResponseProcessor response = null;
       private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+      private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
       volatile boolean hasError = false;
       volatile int errorIndex = -1;
       private BlockConstructionStage stage;  // block construction stage
@@ -3109,7 +3110,9 @@
           success = false;
 
           long startTime = System.currentTimeMillis();
-          lb = locateFollowingBlock(startTime);
+          DatanodeInfo[] w = excludedNodes.toArray(
+              new DatanodeInfo[excludedNodes.size()]);
+          lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
           block = lb.getBlock();
           block.setNumBytes(0);
           accessToken = lb.getAccessToken();
@@ -3125,12 +3128,16 @@
             namenode.abandonBlock(block, src, clientName);
             block = null;
 
+            LOG.info("Excluding datanode " + nodes[errorIndex]);
+            excludedNodes.add(nodes[errorIndex]);
+
             // Connection failed.  Let's wait a little bit and retry
             retry = true;
             try {
               if (System.currentTimeMillis() - startTime > 5000) {
                 LOG.info("Waiting to find target node: " + nodes[0].getName());
               }
+              //TODO fix this timout. Extract it o a constant, maybe make it available from conf
               Thread.sleep(6000);
             } catch (InterruptedException iex) {
             }
@@ -3228,14 +3235,15 @@
         }
       }
 
-      private LocatedBlock locateFollowingBlock(long start) throws IOException {
+      private LocatedBlock locateFollowingBlock(long start,
+          DatanodeInfo[] excludedNodes) throws IOException {
         int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
         long sleeptime = 400;
         while (true) {
           long localstart = System.currentTimeMillis();
           while (true) {
             try {
-              return namenode.addBlock(src, clientName, block);
+              return namenode.addBlock(src, clientName, block, excludedNodes);
             } catch (RemoteException e) {
               IOException ue = 
                 e.unwrapRemoteException(FileNotFoundException.class,

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Nov 24 05:56:24 2009
@@ -198,6 +198,9 @@
   public LocatedBlock addBlock(String src, String clientName,
                                Block previous) throws IOException;
 
+  public LocatedBlock addBlock(String src, String clientName,
+      Block previous, DatanodeInfo[] excludedNode) throws IOException;
+
   /**
    * The client is done writing data to the given filename, and would 
    * like to complete it.  

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Tue Nov 24 05:56:24 2009
@@ -21,6 +21,7 @@
 import org.apache.hadoop.hdfs.protocol.Block;
 import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.ReflectionUtils;
 import java.util.*;
 
@@ -60,6 +61,26 @@
    * choose <i>numOfReplicas</i> data nodes for <i>writer</i> 
    * to re-replicate a block with size <i>blocksize</i> 
    * If not, return as many as we can.
+   * 
+   * @param srcPath the file to which this chooseTargets is being invoked.
+   * @param numOfReplicas additional number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param chosenNodes datanodes that have been chosen as targets.
+   * @param excludedNodes: datanodes that should not be considered as targets.
+   * @param blocksize size of the data to be written.
+   * @return array of DatanodeDescriptor instances chosen as target
+   * and sorted as a pipeline.
+   */
+  abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+                                             int numOfReplicas,
+                                             DatanodeDescriptor writer,
+                                             List<DatanodeDescriptor> chosenNodes,
+                                             HashMap<Node, Node> excludedNodes,
+                                             long blocksize);
+
+  /**
+   * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+   * If not, return as many as we can.
    * The base implemenatation extracts the pathname of the file from the
    * specified srcInode, but this could be a costly operation depending on the
    * file system implementation. Concrete implementations of this class should
@@ -167,4 +188,29 @@
                         new ArrayList<DatanodeDescriptor>(),
                         blocksize);
   }
+
+  /**
+   * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+   * a block with size <i>blocksize</i>
+   * If not, return as many as we can.
+   * 
+   * @param srcPath a string representation of the file for which chooseTarget is invoked
+   * @param numOfReplicas number of replicas wanted.
+   * @param writer the writer's machine, null if not in the cluster.
+   * @param blocksize size of the data to be written.
+   * @param excludedNodes: datanodes that should not be considered as targets.
+   * @return array of DatanodeDescriptor instances chosen as targets
+   * and sorted as a pipeline.
+   */
+  DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    HashMap<Node, Node> excludedNodes,
+                                    long blocksize) {
+    return chooseTarget(srcPath, numOfReplicas, writer,
+                        new ArrayList<DatanodeDescriptor>(),
+                        excludedNodes,
+                        blocksize);
+  }
+
 }

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Tue Nov 24 05:56:24 2009
@@ -68,6 +68,17 @@
   }
 
   /** {@inheritDoc} */
+  public DatanodeDescriptor[] chooseTarget(String srcPath,
+                                    int numOfReplicas,
+                                    DatanodeDescriptor writer,
+                                    List<DatanodeDescriptor> chosenNodes,
+                                    HashMap<Node, Node> excludedNodes,
+                                    long blocksize) {
+    return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize);
+  }
+
+
+  /** {@inheritDoc} */
   @Override
   public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
                                     int numOfReplicas,

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 24 05:56:24 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.net.CachedDNSToSwitchMapping;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.net.NodeBase;
 import org.apache.hadoop.net.ScriptBasedMapping;
 import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -1317,10 +1318,18 @@
    * are replicated.  Will return an empty 2-elt array if we want the
    * client to "try again later".
    */
-  public LocatedBlock getAdditionalBlock(String src, 
+  public LocatedBlock getAdditionalBlock(String src,
                                          String clientName,
                                          Block previous
                                          ) throws IOException {
+    return getAdditionalBlock(src, clientName, previous, null);
+  }
+
+  public LocatedBlock getAdditionalBlock(String src,
+                                         String clientName,
+                                         Block previous,
+                                         HashMap<Node, Node> excludedNodes
+                                         ) throws IOException {
     long fileLength, blockSize;
     int replication;
     DatanodeDescriptor clientNode = null;
@@ -1356,7 +1365,7 @@
 
     // choose targets for the new block to be allocated.
     DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
-        src, replication, clientNode, blockSize);
+        src, replication, clientNode, excludedNodes, blockSize);
     if (targets.length < blockManager.minReplication) {
       throw new IOException("File " + src + " could only be replicated to " +
                             targets.length + " nodes, instead of " +

Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Nov 24 05:56:24 2009
@@ -22,6 +22,7 @@
 import java.net.InetSocketAddress;
 import java.net.URI;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 
@@ -73,6 +74,7 @@
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -612,14 +614,30 @@
     namesystem.setOwner(src, username, groupname);
   }
 
-  /**
-   */
+
+  @Override
   public LocatedBlock addBlock(String src, String clientName,
                                Block previous) throws IOException {
+    return addBlock(src, clientName, previous, null);
+  }
+
+  @Override
+  public LocatedBlock addBlock(String src,
+                               String clientName,
+                               Block previous,
+                               DatanodeInfo[] excludedNodes
+                               ) throws IOException {
     stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
                          +src+" for "+clientName);
+    HashMap<Node, Node> excludedNodesSet = null;
+    if (excludedNodes != null) {
+      excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+      for (Node node:excludedNodes) {
+        excludedNodesSet.put(node, node);
+      }
+    }
     LocatedBlock locatedBlock = 
-      namesystem.getAdditionalBlock(src, clientName, previous);
+      namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
     if (locatedBlock != null)
       myMetrics.numAddBlockOps.inc();
     return locatedBlock;

Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java?rev=883596&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java Tue Nov 24 05:56:24 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * These tests make sure that DFSClient retries fetching data from DFS
+ * properly in case of errors.
+ */
+public class TestDFSClientExcludedNodes extends TestCase {
+  public void testExcludedNodes() throws IOException
+  {   
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+    FileSystem fs = cluster.getFileSystem();
+    Path filePath = new Path("/testExcludedNodes");
+
+    // kill a datanode
+    cluster.stopDataNode(AppendTestUtil.nextInt(3));
+    OutputStream out = fs.create(filePath, true, 4096);
+    out.write(20);
+    
+    try {
+      out.close();
+    } catch (Exception e) {
+      fail("DataNode failure should not result in a block abort: \n" + e.getMessage());
+    }
+  }
+  
+}
+
+   
+  
\ No newline at end of file

Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Nov 24 05:56:24 2009
@@ -137,8 +137,17 @@
       return versionID;
     }
 
-    public LocatedBlock addBlock(String src, String clientName, Block previous)
-    throws IOException
+    public LocatedBlock addBlock(String src, String clientName,
+                                 Block previous) throws IOException {
+
+      return addBlock(src, clientName, previous, null);
+    }
+
+    public LocatedBlock addBlock(String src,
+                                 String clientName,
+                                 Block previous,
+                                 DatanodeInfo[] excludedNode
+                                 ) throws IOException
     {
       num_calls++;
       if (num_calls > num_calls_allowed) {