You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by sz...@apache.org on 2009/08/18 01:06:21 UTC
svn commit: r805203 - in /hadoop/hdfs/trunk: ./
src/java/org/apache/hadoop/hdfs/protocol/
src/java/org/apache/hadoop/hdfs/server/datanode/
src/java/org/apache/hadoop/hdfs/server/namenode/
src/java/org/apache/hadoop/hdfs/server/protocol/ src/test/hdfs/o...
Author: szetszwo
Date: Mon Aug 17 23:06:21 2009
New Revision: 805203
URL: http://svn.apache.org/viewvc?rev=805203&view=rev
Log:
HDFS-457. Do not shutdown datanode if some, but not all, volumns fail. Contributed by Boris Shkolnik
Added:
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
Modified:
hadoop/hdfs/trunk/CHANGES.txt
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
Modified: hadoop/hdfs/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/CHANGES.txt?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/CHANGES.txt (original)
+++ hadoop/hdfs/trunk/CHANGES.txt Mon Aug 17 23:06:21 2009
@@ -81,8 +81,6 @@
HDFS-524. Further DataTransferProtocol code refactoring. (szetszwo)
- HDFS-527. Remove/deprecate unnecessary DFSClient constructors. (szetszwo)
-
HDFS-529. Use BlockInfo instead of Block to avoid redundant block searches
in BlockManager. (shv)
@@ -96,6 +94,9 @@
HDFS-546. DatanodeDescriptor iterator blocks as BlockInfo. (shv)
+ HDFS-457. Do not shutdown datanode if some, but not all, volumns fail.
+ (Boris Shkolnik via szetszwo)
+
BUG FIXES
HDFS-76. Better error message to users when commands fail because of
@@ -147,9 +148,6 @@
HDFS-119. Fix a bug in logSync(), which causes NameNode block forever.
(Suresh Srinivas via shv)
- HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
- (Bill Zeller via szetszwo)
-
HDFS-534. Include avro in ivy. (szetszwo)
HDFS-532. Allow applications to know that a read request failed
@@ -165,3 +163,8 @@
HDFS-525. The SimpleDateFormat object in ListPathsServlet is not thread
safe. (Suresh Srinivas via szetszwo)
+
+ HDFS-167. Fix a bug in DFSClient that caused infinite retries on write.
+ (Bill Zeller via szetszwo)
+
+ HDFS-527. Remove/deprecate unnecessary DFSClient constructors. (szetszwo)
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/protocol/FSConstants.java Mon Aug 17 23:06:21 2009
@@ -56,6 +56,7 @@
public static final int DEFAULT_DATA_SOCKET_SIZE = 128 * 1024;
public static final int SIZE_OF_INTEGER = Integer.SIZE / Byte.SIZE;
+ public static final int MIN_NUM_OF_VALID_VOLUMES = 1;// for a DN to run
// SafeMode actions
public enum SafeModeAction{ SAFEMODE_LEAVE, SAFEMODE_ENTER, SAFEMODE_GET; }
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon Aug 17 23:06:21 2009
@@ -119,6 +119,9 @@
// check if there is a disk error
IOException cause = FSDataset.getCauseIfDiskError(ioe);
+ DataNode.LOG.warn("IOException in BlockReceiver constructor. Cause is ",
+ cause);
+
if (cause != null) { // possible disk error
ioe = cause;
datanode.checkDiskError(ioe); // may throw an exception here
@@ -833,7 +836,14 @@
SUCCESS.write(replyOut);
replyOut.flush();
} catch (Exception e) {
+ LOG.warn("IOException in BlockReceiver.lastNodeRun: ", e);
if (running) {
+ try {
+ datanode.checkDiskError(e); // may throw an exception here
+ } catch (IOException ioe) {
+ LOG.warn("DataNode.chekDiskError failed in lastDataNodeRun with: ",
+ ioe);
+ }
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
@@ -993,7 +1003,13 @@
running = false;
}
} catch (IOException e) {
+ LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
+ try {
+ datanode.checkDiskError(e); // may throw an exception here
+ } catch (IOException ioe) {
+ LOG.warn("DataNode.chekDiskError failed in run() with: ", ioe);
+ }
LOG.info("PacketResponder " + block + " " + numTargets +
" Exception " + StringUtils.stringifyException(e));
running = false;
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon Aug 17 23:06:21 2009
@@ -694,11 +694,14 @@
}
- /* Check if there is no space in disk or the disk is read-only
- * when IOException occurs.
- * If so, handle the error */
- protected void checkDiskError( IOException e ) throws IOException {
- if (e.getMessage() != null &&
+ /** Check if there is no space in disk
+ * @param e that caused this checkDiskError call
+ **/
+ protected void checkDiskError(Exception e ) throws IOException {
+
+ LOG.warn("checkDiskError: exception: ", e);
+
+ if (e.getMessage() != null &&
e.getMessage().startsWith("No space left on device")) {
throw new DiskOutOfSpaceException("No space left on device");
} else {
@@ -706,8 +709,11 @@
}
}
- /* Check if there is no disk space and if so, handle the error*/
- protected void checkDiskError( ) throws IOException {
+ /**
+ * Check if there is a disk failure and if so, handle the error
+ *
+ **/
+ protected void checkDiskError( ) {
try {
data.checkDataDir();
} catch(DiskErrorException de) {
@@ -716,13 +722,31 @@
}
private void handleDiskError(String errMsgr) {
- LOG.warn("DataNode is shutting down.\n" + errMsgr);
- shouldRun = false;
+ boolean hasEnoughResource = data.hasEnoughResource();
+ LOG.warn("DataNode.handleDiskError: Keep Running: " + hasEnoughResource);
+
+ //if hasEnoughtResource = true - more volumes are available, so we don't want
+ // to shutdown DN completely and don't want NN to remove it.
+ int dp_error = DatanodeProtocol.DISK_ERROR;
+ if(hasEnoughResource == false) {
+ // DN will be shutdown and NN should remove it
+ dp_error = DatanodeProtocol.FATAL_DISK_ERROR;
+ }
+ //inform NameNode
try {
namenode.errorReport(
- dnRegistration, DatanodeProtocol.DISK_ERROR, errMsgr);
+ dnRegistration, dp_error, errMsgr);
} catch(IOException ignored) {
}
+
+
+ if(hasEnoughResource) {
+ scheduleBlockReport(0);
+ return; // do not shutdown
+ }
+
+ LOG.warn("DataNode is shutting down.\n" + errMsgr);
+ shouldRun = false;
}
/** Number of concurrent xceivers per node. */
@@ -1238,6 +1262,9 @@
} catch (IOException ie) {
LOG.warn(dnRegistration + ":Failed to transfer " + b + " to " + targets[0].getName()
+ " got " + StringUtils.stringifyException(ie));
+ // check if there are any disk problem
+ datanode.checkDiskError();
+
} finally {
xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender);
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java Mon Aug 17 23:06:21 2009
@@ -17,27 +17,42 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
-import java.io.*;
-import java.util.*;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
-import org.apache.hadoop.fs.*;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.DF;
+import org.apache.hadoop.fs.DU;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.FSConstants;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
+import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.metrics.util.MBeanUtil;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
-import org.apache.hadoop.conf.*;
-import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
-import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
+import org.mortbay.log.Log;
/**************************************************
* FSDataset manages a set of data blocks. Each block
@@ -459,9 +474,25 @@
FSVolumeSet(FSVolume[] volumes) {
this.volumes = volumes;
}
+
+ private int numberOfVolumes() {
+ return volumes.length;
+ }
synchronized FSVolume getNextVolume(long blockSize) throws IOException {
+
+ if(volumes.length < 1) {
+ throw new DiskOutOfSpaceException("No more available volumes");
+ }
+
+ // since volumes could've been removed because of the failure
+ // make sure we are not out of bounds
+ if(curVolume >= volumes.length) {
+ curVolume = 0;
+ }
+
int startVolume = curVolume;
+
while (true) {
FSVolume volume = volumes[curVolume];
curVolume = (curVolume + 1) % volumes.length;
@@ -502,10 +533,46 @@
}
}
- synchronized void checkDirs() throws DiskErrorException {
+ /**
+ * goes over all the volumes and checkDir eachone of them
+ * if one throws DiskErrorException - removes from the list of active
+ * volumes.
+ * @return list of all the removed volumes
+ */
+ synchronized List<FSVolume> checkDirs() {
+
+ ArrayList<FSVolume> removed_vols = null;
+
for (int idx = 0; idx < volumes.length; idx++) {
- volumes[idx].checkDirs();
+ FSVolume fsv = volumes[idx];
+ try {
+ fsv.checkDirs();
+ } catch (DiskErrorException e) {
+ DataNode.LOG.warn("Removing failed volume " + fsv + ": ",e);
+ if(removed_vols == null) {
+ removed_vols = new ArrayList<FSVolume>(1);
+ }
+ removed_vols.add(volumes[idx]);
+ volumes[idx] = null; //remove the volume
+ }
+ }
+
+ // repair array - copy non null elements
+ int removed_size = (removed_vols==null)? 0 : removed_vols.size();
+ if(removed_size > 0) {
+ FSVolume fsvs[] = new FSVolume [volumes.length-removed_size];
+ for(int idx=0,idy=0; idx<volumes.length; idx++) {
+ if(volumes[idx] != null) {
+ fsvs[idy] = volumes[idx];
+ idy++;
+ }
+ }
+ volumes = fsvs; // replace array of volumes
}
+ Log.info("Completed FSVolumeSet.checkDirs. Removed=" + removed_size +
+ "volumes. List of current volumes: " + toString());
+
+ return removed_vols;
}
public String toString() {
@@ -682,7 +749,14 @@
return volumes.getDfsUsed();
}
}
-
+ /**
+ * Return true - if there are still valid volumes
+ * on the DataNode
+ */
+ public boolean hasEnoughResource(){
+ return volumes.numberOfVolumes() >= MIN_NUM_OF_VALID_VOLUMES;
+ }
+
/**
* Return total capacity, used and unused
*/
@@ -1229,17 +1303,32 @@
* Check whether the given block is a valid one.
*/
public boolean isValidBlock(Block b) {
- return validateBlockFile(b) != null;
+ File f = null;;
+ try {
+ f = validateBlockFile(b);
+ } catch(IOException e) {
+ Log.warn("Block " + b + " is not valid:",e);
+ }
+
+ return f != null;
}
/**
* Find the file corresponding to the block and return it if it exists.
*/
- File validateBlockFile(Block b) {
+ File validateBlockFile(Block b) throws IOException {
//Should we check for metadata file too?
File f = getFile(b);
- if(f != null && f.exists())
- return f;
+
+ if(f != null ) {
+ if(f.exists())
+ return f;
+
+ // if file is not null, but doesn't exist - possibly disk failed
+ DataNode datanode = DataNode.getDataNode();
+ datanode.checkDiskError();
+ }
+
if (InterDatanodeProtocol.LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("b=" + b + ", f=" + f);
}
@@ -1378,10 +1467,51 @@
/**
* check if a data directory is healthy
+ * if some volumes failed - make sure to remove all the blocks that belong
+ * to these volumes
* @throws DiskErrorException
*/
public void checkDataDir() throws DiskErrorException {
- volumes.checkDirs();
+ long total_blocks=0, removed_blocks=0;
+ List<FSVolume> failed_vols = volumes.checkDirs();
+
+ //if there no failed volumes return
+ if(failed_vols == null)
+ return;
+
+ // else
+ // remove related blocks
+ long mlsec = System.currentTimeMillis();
+ synchronized (this) {
+ Iterator<Block> ib = volumeMap.keySet().iterator();
+ while(ib.hasNext()) {
+ Block b = ib.next();
+ total_blocks ++;
+ // check if the volume block belongs to still valid
+ FSVolume vol = volumeMap.get(b).getVolume();
+ for(FSVolume fv: failed_vols) {
+ if(vol == fv) {
+ DataNode.LOG.warn("removing block " + b.getBlockId() + " from vol "
+ + vol.dataDir.dir.getAbsolutePath());
+ ib.remove();
+ removed_blocks++;
+ break;
+ }
+ }
+ }
+ } // end of sync
+ mlsec = System.currentTimeMillis() - mlsec;
+ DataNode.LOG.warn(">>>>>>>>>>>>Removed " + removed_blocks + " out of " + total_blocks +
+ "(took " + mlsec + " millisecs)");
+
+ // report the error
+ StringBuilder sb = new StringBuilder();
+ for(FSVolume fv : failed_vols) {
+ sb.append(fv.dataDir.dir.getAbsolutePath() + ";");
+ }
+
+ throw new DiskErrorException("DataNode failed volumes:" + sb);
+
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java Mon Aug 17 23:06:21 2009
@@ -264,4 +264,10 @@
* @throws IOException
*/
public void validateBlockMetadata(Block b) throws IOException;
+
+ /**
+ * checks how many valid storage volumes are there in the DataNode
+ * @return true if more then minimum valid volumes left in the FSDataSet
+ */
+ public boolean hasEnoughResource();
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java Mon Aug 17 23:06:21 2009
@@ -942,6 +942,8 @@
}
verifyRequest(nodeReg);
if (errorCode == DatanodeProtocol.DISK_ERROR) {
+ LOG.warn("Volume failed on " + dnName);
+ } else if (errorCode == DatanodeProtocol.FATAL_DISK_ERROR) {
namesystem.removeDatanode(nodeReg);
}
}
Modified: hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java (original)
+++ hadoop/hdfs/trunk/src/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java Mon Aug 17 23:06:21 2009
@@ -42,8 +42,9 @@
// error code
final static int NOTIFY = 0;
- final static int DISK_ERROR = 1;
+ final static int DISK_ERROR = 1; // there are still valid volumes on DN
final static int INVALID_BLOCK = 2;
+ final static int FATAL_DISK_ERROR = 3; // no valid volumes left on DN
/**
* Determines actions that data node should perform
Modified: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java?rev=805203&r1=805202&r2=805203&view=diff
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java (original)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java Mon Aug 17 23:06:21 2009
@@ -655,4 +655,8 @@
public String getStorageInfo() {
return "Simulated FSDataset-" + storageId;
}
+
+ public boolean hasEnoughResource() {
+ return true;
+ }
}
Added: hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java
URL: http://svn.apache.org/viewvc/hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java?rev=805203&view=auto
==============================================================================
--- hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java (added)
+++ hadoop/hdfs/trunk/src/test/hdfs/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java Mon Aug 17 23:06:21 2009
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs.server.datanode;
+
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.DFSClient;
+import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.protocol.Block;
+import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
+import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
+import org.apache.hadoop.hdfs.server.common.HdfsConstants;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.hdfs.server.namenode.NameNode;
+import org.apache.hadoop.net.NetUtils;
+import org.junit.After;
+import org.junit.Before;
+
+public class TestDataNodeVolumeFailure extends TestCase{
+ final private int block_size = 512;
+ MiniDFSCluster cluster = null;
+ int dn_num = 2;
+ int blocks_num = 30;
+ short repl=2;
+ File dataDir = null;
+ File data_fail = null;
+ File failedDir = null;
+
+ // mapping blocks to Meta files(physical files) and locs(NameNode locations)
+ private class BlockLocs {
+ public int num_files = 0;
+ public int num_locs = 0;
+ }
+ // block id to BlockLocs
+ Map<String, BlockLocs> block_map = new HashMap<String, BlockLocs> ();
+
+ @Before
+ public void setUp() throws Exception {
+
+ // bring up a cluster of 2
+ Configuration conf = new Configuration();
+ conf.setLong("dfs.block.size", block_size);
+ cluster = new MiniDFSCluster(conf, dn_num, true, null);
+ cluster.waitActive();
+ }
+
+
+
+ public void testVolumeFailure() throws IOException {
+ FileSystem fs = cluster.getFileSystem();
+ dataDir = new File(cluster.getDataDirectory());
+ System.out.println("Data dir: is " + dataDir.getPath());
+
+
+ // Data dir structure is dataDir/data[1-4]/[current,tmp...]
+ // data1,2 is for datanode 1, data2,3 - datanode2
+ String filename = "/test.txt";
+ Path filePath = new Path(filename);
+
+ // we use only small number of blocks to avoid creating subdirs in the data dir..
+ int filesize = block_size*blocks_num;
+ DFSTestUtil.createFile(fs, filePath, filesize, repl, 1L);
+ DFSTestUtil.waitReplication(fs, filePath, repl);
+ System.out.println("file " + filename + "(size " +
+ filesize + ") is created and replicated");
+
+ // fail the volume
+ // delete/make non-writable one of the directories (failed volume)
+ data_fail = new File(dataDir, "data3");
+ failedDir = new File(data_fail, "current");
+ if (failedDir.exists() &&
+ //!FileUtil.fullyDelete(failedDir)
+ !deteteBlocks(failedDir)
+ ) {
+ throw new IOException("Could not delete hdfs directory '" + failedDir + "'");
+ }
+ data_fail.setReadOnly();
+ failedDir.setReadOnly();
+ System.out.println("Deleteing " + failedDir.getPath() + "; exist=" + failedDir.exists());
+
+ // access all the blocks on the "failed" DataNode,
+ // we need to make sure that the "failed" volume is being accessed -
+ // and that will cause failure, blocks removal, "emergency" block report
+ triggerFailure(filename, filesize);
+
+ // make sure a block report is sent
+ DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3
+ cluster.getNameNode().blockReport(dn.dnRegistration,
+ BlockListAsLongs.convertToArrayLongs(cluster.getBlockReport(1)));
+
+ // verify number of blocks and files...
+ verify(filename, filesize);
+
+ // create another file (with one volume failed).
+ System.out.println("creating file test1.txt");
+ Path fileName1 = new Path("/test1.txt");
+ DFSTestUtil.createFile(fs, fileName1, filesize, repl, 1L);
+
+
+ // should be able to replicate to both nodes (2 DN, repl=2)
+ DFSTestUtil.waitReplication(fs, fileName1, repl);
+ System.out.println("file " + fileName1.getName() +
+ " is created and replicated");
+
+ }
+
+ /**
+ * verifies two things:
+ * 1. number of locations of each block in the name node
+ * matches number of actual files
+ * 2. block files + pending block equals to total number of blocks that a file has
+ * including the replication (HDFS file has 30 blocks, repl=2 - total 60
+ * @param fn - file name
+ * @param fs - file size
+ * @throws IOException
+ */
+ private void verify(String fn, int fs) throws IOException{
+ // now count how many physical blocks are there
+ int totalReal = countRealBlocks(block_map);
+ System.out.println("countRealBlocks counted " + totalReal + " blocks");
+
+ // count how many blocks store in NN structures.
+ int totalNN = countNNBlocks(block_map, fn, fs);
+ System.out.println("countNNBlocks counted " + totalNN + " blocks");
+
+ for(String bid : block_map.keySet()) {
+ BlockLocs bl = block_map.get(bid);
+ // System.out.println(bid + "->" + bl.num_files + "vs." + bl.num_locs);
+ // number of physical files (1 or 2) should be same as number of datanodes
+ // in the list of the block locations
+ assertEquals(bl.num_files, bl.num_locs);
+ }
+ // verify we have the same number of physical blocks and stored in NN
+ assertEquals(totalReal, totalNN);
+
+ // now check the number of under-replicated blocks
+ FSNamesystem fsn = cluster.getNamesystem();
+ // force update of all the metric counts by calling computeDatanodeWork
+ fsn.computeDatanodeWork();
+ // get all the counts
+ long underRepl = fsn.getUnderReplicatedBlocks();
+ long pendRepl = fsn.getPendingReplicationBlocks();
+ long totalRepl = underRepl + pendRepl;
+ System.out.println("underreplicated after = "+ underRepl +
+ " and pending repl =" + pendRepl + "; total underRepl = " + totalRepl);
+
+ System.out.println("total blocks (real and replicating):" +
+ (totalReal + totalRepl) + " vs. all files blocks " + blocks_num*2);
+
+ // together all the blocks should be equal to all real + all underreplicated
+ assertEquals(totalReal + totalRepl, blocks_num*repl);
+ }
+
+ /**
+ * go to each block on the 2nd DataNode until it fails...
+ * @param path
+ * @param size
+ * @throws IOException
+ */
+ private void triggerFailure(String path, long size) throws IOException {
+ NameNode nn = cluster.getNameNode();
+ List<LocatedBlock> locatedBlocks = nn.getBlockLocations(path, 0, size).getLocatedBlocks();
+// System.out.println("Number of blocks: " + locatedBlocks.size());
+
+ for(LocatedBlock lb : locatedBlocks) {
+ DatanodeInfo dinfo = lb.getLocations()[1];
+ Block b = lb.getBlock();
+ // System.out.println(i++ + ". " + b.getBlockName());
+ try {
+ accessBlock(dinfo, lb);
+ } catch (IOException e) {
+ System.out.println("Failure triggered, on block: " + b.getBlockId() +
+ "; corresponding volume should be removed by now");
+ break;
+ }
+ }
+ }
+
+ /**
+ * simulate failure delete all the block files
+ * @param dir
+ * @throws IOException
+ */
+ private boolean deteteBlocks(File dir) {
+
+ File [] fileList = dir.listFiles();
+ for(File f : fileList) {
+ if(f.getName().startsWith("blk_")) {
+ if(!f.delete())
+ return false;
+
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * try to access a block on a data node. If fails - throws exception
+ * @param datanode
+ * @param lblock
+ * @throws IOException
+ */
+ private void accessBlock(DatanodeInfo datanode, LocatedBlock lblock)
+ throws IOException {
+ InetSocketAddress targetAddr = null;
+ Socket s = null;
+ DFSClient.BlockReader blockReader = null;
+ Block block = lblock.getBlock();
+
+ targetAddr = NetUtils.createSocketAddr(datanode.getName());
+
+ s = new Socket();
+ s.connect(targetAddr, HdfsConstants.READ_TIMEOUT);
+ s.setSoTimeout(HdfsConstants.READ_TIMEOUT);
+
+ blockReader =
+ DFSClient.BlockReader.newBlockReader(s, targetAddr.toString() + ":" +
+ block.getBlockId(),
+ block.getBlockId(),
+ lblock.getAccessToken(),
+ block.getGenerationStamp(),
+ 0, -1, 4096);
+
+ // nothing - if it fails - it will throw and exception
+ }
+
+ /**
+ * Count datanodes that have copies of the blocks for a file
+ * put it into the map
+ * @param map
+ * @param path
+ * @param size
+ * @return
+ * @throws IOException
+ */
+ private int countNNBlocks(Map<String, BlockLocs> map, String path, long size)
+ throws IOException {
+ int total = 0;
+
+ NameNode nn = cluster.getNameNode();
+ List<LocatedBlock> locatedBlocks =
+ nn.getBlockLocations(path, 0, size).getLocatedBlocks();
+ //System.out.println("Number of blocks: " + locatedBlocks.size());
+
+ for(LocatedBlock lb : locatedBlocks) {
+ String blockId = ""+lb.getBlock().getBlockId();
+ //System.out.print(blockId + ": ");
+ DatanodeInfo[] dn_locs = lb.getLocations();
+ BlockLocs bl = map.get(blockId);
+ if(bl == null) {
+ bl = new BlockLocs();
+ }
+ //System.out.print(dn_info.name+",");
+ total += dn_locs.length;
+ bl.num_locs += dn_locs.length;
+ map.put(blockId, bl);
+ //System.out.println();
+ }
+ return total;
+ }
+
+ /**
+ * look for real blocks
+ * by counting *.meta files in all the storage dirs
+ * @param map
+ * @return
+ */
+
+ private int countRealBlocks(Map<String, BlockLocs> map) {
+ int total = 0;
+ for(int i=0; i<dn_num; i++) {
+ for(int j=1; j<=2; j++) {
+ File dir = new File(new File(dataDir, "data"+(2*i+j)), "current");
+ if(dir == null) {
+ System.out.println("dir is null for dn=" + i + " and data_dir=" + j);
+ continue;
+ }
+
+ String [] res = metaFilesInDir(dir);
+ if(res == null) {
+ System.out.println("res is null for dir = " + dir + " i=" + i + " and j=" + j);
+ continue;
+ }
+ //System.out.println("for dn" + i + "." + j + ": " + dir + "=" + res.length+ " files");
+
+ //int ii = 0;
+ for(String s: res) {
+ // cut off "blk_-" at the beginning and ".meta" at the end
+ assertNotNull(s);
+ String bid = s.substring(s.indexOf("_")+1, s.lastIndexOf("_"));
+ //System.out.println(ii++ + ". block " + s + "; id=" + bid);
+ BlockLocs val = map.get(bid);
+ if(val == null) {
+ val = new BlockLocs();
+ }
+ val.num_files ++; // one more file for the block
+ map.put(bid, val);
+
+ }
+ //System.out.println("dir1="+dir.getPath() + "blocks=" + res.length);
+ //System.out.println("dir2="+dir2.getPath() + "blocks=" + res2.length);
+
+ total += res.length;
+ }
+ }
+ return total;
+ }
+
+ /*
+ * count how many files *.meta are in the dir
+ */
+ private String [] metaFilesInDir(File dir) {
+ String [] res = dir.list(
+ new FilenameFilter() {
+ public boolean accept(File dir, String name) {
+ return name.startsWith("blk_") &&
+ name.endsWith(FSDataset.METADATA_EXTENSION);
+ }
+ }
+ );
+ return res;
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if(data_fail != null) {
+ data_fail.setWritable(true);
+ }
+ if(failedDir != null) {
+ failedDir.setWritable(true);
+ }
+ if(cluster != null) {
+ cluster.shutdown();
+ }
+ }
+
+}