You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by st...@apache.org on 2009/11/24 06:56:27 UTC
svn commit: r883596 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/ src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/test/hdfs/org/apache/hadoop/hdfs/
Author: stack
Date: Tue Nov 24 05:56:24 2009
New Revision: 883596
URL: http://svn.apache.org/viewvc?rev=883596&view=rev
Log:
HBASE-630 In DFSOutputStream.nextBlockOutputStream(), the client can exclude specific datanodes when locating the next block
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Tue Nov 24 05:56:24 2009
@@ -28,6 +28,10 @@
HDFS-699. Add unit tests framework (Mockito) (cos, Eli Collins)
+ HDFS-630 In DFSOutputStream.nextBlockOutputStream(), the client can
+ exclude specific datanodes when locating the next block
+ (Cosmin Lehene via Stack)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/DFSClient.java Tue Nov 24 05:56:24 2009
@@ -2585,6 +2585,7 @@
private DataInputStream blockReplyStream;
private ResponseProcessor response = null;
private volatile DatanodeInfo[] nodes = null; // list of targets for current block
+ private ArrayList<DatanodeInfo> excludedNodes = new ArrayList<DatanodeInfo>();
volatile boolean hasError = false;
volatile int errorIndex = -1;
private BlockConstructionStage stage; // block construction stage
@@ -3109,7 +3110,9 @@
success = false;
long startTime = System.currentTimeMillis();
- lb = locateFollowingBlock(startTime);
+ DatanodeInfo[] w = excludedNodes.toArray(
+ new DatanodeInfo[excludedNodes.size()]);
+ lb = locateFollowingBlock(startTime, w.length > 0 ? w : null);
block = lb.getBlock();
block.setNumBytes(0);
accessToken = lb.getAccessToken();
@@ -3125,12 +3128,16 @@
namenode.abandonBlock(block, src, clientName);
block = null;
+ LOG.info("Excluding datanode " + nodes[errorIndex]);
+ excludedNodes.add(nodes[errorIndex]);
+
// Connection failed. Let's wait a little bit and retry
retry = true;
try {
if (System.currentTimeMillis() - startTime > 5000) {
LOG.info("Waiting to find target node: " + nodes[0].getName());
}
+ //TODO fix this timout. Extract it o a constant, maybe make it available from conf
Thread.sleep(6000);
} catch (InterruptedException iex) {
}
@@ -3228,14 +3235,15 @@
}
}
- private LocatedBlock locateFollowingBlock(long start) throws IOException {
+ private LocatedBlock locateFollowingBlock(long start,
+ DatanodeInfo[] excludedNodes) throws IOException {
int retries = conf.getInt("dfs.client.block.write.locateFollowingBlock.retries", 5);
long sleeptime = 400;
while (true) {
long localstart = System.currentTimeMillis();
while (true) {
try {
- return namenode.addBlock(src, clientName, block);
+ return namenode.addBlock(src, clientName, block, excludedNodes);
} catch (RemoteException e) {
IOException ue =
e.unwrapRemoteException(FileNotFoundException.class,
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java Tue Nov 24 05:56:24 2009
@@ -198,6 +198,9 @@
public LocatedBlock addBlock(String src, String clientName,
Block previous) throws IOException;
+ public LocatedBlock addBlock(String src, String clientName,
+ Block previous, DatanodeInfo[] excludedNode) throws IOException;
+
/**
* The client is done writing data to the given filename, and would
* like to complete it.
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicy.java Tue Nov 24 05:56:24 2009
@@ -21,6 +21,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.util.ReflectionUtils;
import java.util.*;
@@ -60,6 +61,26 @@
* choose <i>numOfReplicas</i> data nodes for <i>writer</i>
* to re-replicate a block with size <i>blocksize</i>
* If not, return as many as we can.
+ *
+ * @param srcPath the file to which this chooseTargets is being invoked.
+ * @param numOfReplicas additional number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param chosenNodes datanodes that have been chosen as targets.
+ * @param excludedNodes: datanodes that should not be considered as targets.
+ * @param blocksize size of the data to be written.
+ * @return array of DatanodeDescriptor instances chosen as target
+ * and sorted as a pipeline.
+ */
+ abstract DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize);
+
+ /**
+ * choose <i>numOfReplicas</i> data nodes for <i>writer</i>
+ * If not, return as many as we can.
* The base implemenatation extracts the pathname of the file from the
* specified srcInode, but this could be a costly operation depending on the
* file system implementation. Concrete implementations of this class should
@@ -167,4 +188,29 @@
new ArrayList<DatanodeDescriptor>(),
blocksize);
}
+
+ /**
+ * choose <i>numOfReplicas</i> nodes for <i>writer</i> to replicate
+ * a block with size <i>blocksize</i>
+ * If not, return as many as we can.
+ *
+ * @param srcPath a string representation of the file for which chooseTarget is invoked
+ * @param numOfReplicas number of replicas wanted.
+ * @param writer the writer's machine, null if not in the cluster.
+ * @param blocksize size of the data to be written.
+ * @param excludedNodes: datanodes that should not be considered as targets.
+ * @return array of DatanodeDescriptor instances chosen as targets
+ * and sorted as a pipeline.
+ */
+ DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize) {
+ return chooseTarget(srcPath, numOfReplicas, writer,
+ new ArrayList<DatanodeDescriptor>(),
+ excludedNodes,
+ blocksize);
+ }
+
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/BlockPlacementPolicyDefault.java Tue Nov 24 05:56:24 2009
@@ -68,6 +68,17 @@
}
/** {@inheritDoc} */
+ public DatanodeDescriptor[] chooseTarget(String srcPath,
+ int numOfReplicas,
+ DatanodeDescriptor writer,
+ List<DatanodeDescriptor> chosenNodes,
+ HashMap<Node, Node> excludedNodes,
+ long blocksize) {
+ return chooseTarget(numOfReplicas, writer, chosenNodes, excludedNodes, blocksize);
+ }
+
+
+ /** {@inheritDoc} */
@Override
public DatanodeDescriptor[] chooseTarget(FSInodeInfo srcInode,
int numOfReplicas,
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java Tue Nov 24 05:56:24 2009
@@ -40,6 +40,7 @@
import org.apache.hadoop.net.CachedDNSToSwitchMapping;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.net.ScriptBasedMapping;
import org.apache.hadoop.hdfs.server.namenode.LeaseManager.Lease;
@@ -1317,10 +1318,18 @@
* are replicated. Will return an empty 2-elt array if we want the
* client to "try again later".
*/
- public LocatedBlock getAdditionalBlock(String src,
+ public LocatedBlock getAdditionalBlock(String src,
String clientName,
Block previous
) throws IOException {
+ return getAdditionalBlock(src, clientName, previous, null);
+ }
+
+ public LocatedBlock getAdditionalBlock(String src,
+ String clientName,
+ Block previous,
+ HashMap<Node, Node> excludedNodes
+ ) throws IOException {
long fileLength, blockSize;
int replication;
DatanodeDescriptor clientNode = null;
@@ -1356,7 +1365,7 @@
// choose targets for the new block to be allocated.
DatanodeDescriptor targets[] = blockManager.replicator.chooseTarget(
- src, replication, clientNode, blockSize);
+ src, replication, clientNode, excludedNodes, blockSize);
if (targets.length < blockManager.minReplication) {
throw new IOException("File " + src + " could only be replicated to " +
targets.length + " nodes, instead of " +
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Tue Nov 24 05:56:24 2009
@@ -22,6 +22,7 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
@@ -73,6 +74,7 @@
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
+import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@@ -612,14 +614,30 @@
namesystem.setOwner(src, username, groupname);
}
- /**
- */
+
+ @Override
public LocatedBlock addBlock(String src, String clientName,
Block previous) throws IOException {
+ return addBlock(src, clientName, previous, null);
+ }
+
+ @Override
+ public LocatedBlock addBlock(String src,
+ String clientName,
+ Block previous,
+ DatanodeInfo[] excludedNodes
+ ) throws IOException {
stateChangeLog.debug("*BLOCK* NameNode.addBlock: file "
+src+" for "+clientName);
+ HashMap<Node, Node> excludedNodesSet = null;
+ if (excludedNodes != null) {
+ excludedNodesSet = new HashMap<Node, Node>(excludedNodes.length);
+ for (Node node:excludedNodes) {
+ excludedNodesSet.put(node, node);
+ }
+ }
LocatedBlock locatedBlock =
- namesystem.getAdditionalBlock(src, clientName, previous);
+ namesystem.getAdditionalBlock(src, clientName, previous, excludedNodesSet);
if (locatedBlock != null)
myMetrics.numAddBlockOps.inc();
return locatedBlock;
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java?rev=883596&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientExcludedNodes.java Tue Nov 24 05:56:24 2009
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import junit.framework.TestCase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+
+/**
+ * These tests make sure that DFSClient retries fetching data from DFS
+ * properly in case of errors.
+ */
+public class TestDFSClientExcludedNodes extends TestCase {
+ public void testExcludedNodes() throws IOException
+ {
+ Configuration conf = new HdfsConfiguration();
+ MiniDFSCluster cluster = new MiniDFSCluster(conf, 3, true, null);
+ FileSystem fs = cluster.getFileSystem();
+ Path filePath = new Path("/testExcludedNodes");
+
+ // kill a datanode
+ cluster.stopDataNode(AppendTestUtil.nextInt(3));
+ OutputStream out = fs.create(filePath, true, 4096);
+ out.write(20);
+
+ try {
+ out.close();
+ } catch (Exception e) {
+ fail("DataNode failure should not result in a block abort: \n" + e.getMessage());
+ }
+ }
+
+}
+
+
+
\ No newline at end of file
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java?rev=883596&r1=883595&r2=883596&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/TestDFSClientRetries.java Tue Nov 24 05:56:24 2009
@@ -137,8 +137,17 @@
return versionID;
}
- public LocatedBlock addBlock(String src, String clientName, Block previous)
- throws IOException
+ public LocatedBlock addBlock(String src, String clientName,
+ Block previous) throws IOException {
+
+ return addBlock(src, clientName, previous, null);
+ }
+
+ public LocatedBlock addBlock(String src,
+ String clientName,
+ Block previous,
+ DatanodeInfo[] excludedNode
+ ) throws IOException
{
num_calls++;
if (num_calls > num_calls_allowed) {