You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ki...@apache.org on 2014/05/12 21:08:07 UTC
svn commit: r1594055 - in
/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./
src/main/java/org/apache/hadoop/hdfs/server/datanode/
src/test/java/org/apache/hadoop/hdfs/server/datanode/
Author: kihwal
Date: Mon May 12 19:08:06 2014
New Revision: 1594055
URL: http://svn.apache.org/r1594055
Log:
HDFS-5522. Datanode disk error check may be incorrectly skipped. Contributed by Rushabh Shah.
Modified:
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon May 12 19:08:06 2014
@@ -452,6 +452,9 @@ Release 2.5.0 - UNRELEASED
HDFS-6351. Command hdfs dfs -rm -r can't remove empty directory.
(Yongjun Zhang via wang)
+ HDFS-5522. Datanode disk error check may be incorrectly skipped.
+ (Rushabh S Shah via kihwal)
+
Release 2.4.1 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon May 12 19:08:06 2014
@@ -248,7 +248,7 @@ class BlockReceiver implements Closeable
if (cause != null) { // possible disk error
ioe = cause;
- datanode.checkDiskError(ioe); // may throw an exception here
+ datanode.checkDiskError();
}
throw ioe;
@@ -324,7 +324,7 @@ class BlockReceiver implements Closeable
}
// disk check
if(ioe != null) {
- datanode.checkDiskError(ioe);
+ datanode.checkDiskError();
throw ioe;
}
}
@@ -615,7 +615,7 @@ class BlockReceiver implements Closeable
manageWriterOsCache(offsetInBlock);
}
} catch (IOException iex) {
- datanode.checkDiskError(iex);
+ datanode.checkDiskError();
throw iex;
}
}
@@ -1171,11 +1171,7 @@ class BlockReceiver implements Closeable
} catch (IOException e) {
LOG.warn("IOException in BlockReceiver.run(): ", e);
if (running) {
- try {
- datanode.checkDiskError(e); // may throw an exception here
- } catch (IOException ioe) {
- LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
- }
+ datanode.checkDiskError();
LOG.info(myString, e);
running = false;
if (!Thread.interrupted()) { // failure not caused by interruption
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon May 12 19:08:06 2014
@@ -84,7 +84,6 @@ import org.apache.hadoop.security.token.
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.mortbay.util.ajax.JSON;
import javax.management.ObjectName;
@@ -92,8 +91,6 @@ import javax.management.ObjectName;
import java.io.*;
import java.lang.management.ManagementFactory;
import java.net.*;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.*;
@@ -229,6 +226,11 @@ public class DataNode extends Configured
ReadaheadPool readaheadPool;
private final boolean getHdfsBlockLocationsEnabled;
private ObjectName dataNodeInfoBeanName;
+ private Thread checkDiskErrorThread = null;
+ protected final int checkDiskErrorInterval = 5*1000;
+ private boolean checkDiskErrorFlag = false;
+ private Object checkDiskErrorMutex = new Object();
+ private long lastDiskErrorCheck;
/**
* Create the DataNode given a configuration, an array of dataDirs,
@@ -238,6 +240,7 @@ public class DataNode extends Configured
final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException {
super(conf);
+ this.lastDiskErrorCheck = 0;
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
@@ -1212,6 +1215,11 @@ public class DataNode extends Configured
this.dataXceiverServer.interrupt();
}
+ // Interrupt the checkDiskErrorThread and terminate it.
+ if(this.checkDiskErrorThread != null) {
+ this.checkDiskErrorThread.interrupt();
+ }
+
// Record the time of initial notification
long timeNotified = Time.now();
@@ -1321,55 +1329,17 @@ public class DataNode extends Configured
}
- /** Check if there is no space in disk
- * @param e that caused this checkDiskError call
- **/
- protected void checkDiskError(Exception e ) throws IOException {
-
- LOG.warn("checkDiskError: exception: ", e);
- if (isNetworkRelatedException(e)) {
- LOG.info("Not checking disk as checkDiskError was called on a network" +
- " related exception");
- return;
- }
- if (e.getMessage() != null &&
- e.getMessage().startsWith("No space left on device")) {
- throw new DiskOutOfSpaceException("No space left on device");
- } else {
- checkDiskError();
- }
- }
-
- /**
- * Check if the provided exception looks like it's from a network error
- * @param e the exception from a checkDiskError call
- * @return true if this exception is network related, false otherwise
- */
- protected boolean isNetworkRelatedException(Exception e) {
- if (e instanceof SocketException
- || e instanceof SocketTimeoutException
- || e instanceof ClosedChannelException
- || e instanceof ClosedByInterruptException) {
- return true;
- }
-
- String msg = e.getMessage();
-
- return null != msg
- && (msg.startsWith("An established connection was aborted")
- || msg.startsWith("Broken pipe")
- || msg.startsWith("Connection reset")
- || msg.contains("java.nio.channels.SocketChannel"));
- }
-
/**
* Check if there is a disk failure and if so, handle the error
*/
public void checkDiskError() {
- try {
- data.checkDataDir();
- } catch (DiskErrorException de) {
- handleDiskError(de.getMessage());
+ synchronized(checkDiskErrorMutex) {
+ checkDiskErrorFlag = true;
+ if(checkDiskErrorThread == null) {
+ startCheckDiskErrorThread();
+ checkDiskErrorThread.start();
+ LOG.info("Starting CheckDiskError Thread");
+ }
}
}
@@ -1669,13 +1639,8 @@ public class DataNode extends Configured
} catch (IOException ie) {
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie);
- // check if there are any disk problem
- try{
- checkDiskError(ie);
- } catch(IOException e) {
- LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
- }
-
+ // check if there are any disk problem
+ checkDiskError();
} finally {
xmitsInProgress.getAndDecrement();
IOUtils.closeStream(blockSender);
@@ -2590,4 +2555,50 @@ public class DataNode extends Configured
public ShortCircuitRegistry getShortCircuitRegistry() {
return shortCircuitRegistry;
}
-}
+
+ /**
+ * Starts a new thread which will check for disk error check request
+ * every 5 sec
+ */
+ private void startCheckDiskErrorThread() {
+ checkDiskErrorThread = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ while(shouldRun) {
+ boolean tempFlag ;
+ synchronized(checkDiskErrorMutex) {
+ tempFlag = checkDiskErrorFlag;
+ checkDiskErrorFlag = false;
+ }
+ if(tempFlag) {
+ try {
+ data.checkDataDir();
+ } catch (DiskErrorException de) {
+ handleDiskError(de.getMessage());
+ } catch (Exception e) {
+ LOG.warn("Unexpected exception occurred while checking disk error " + e);
+ checkDiskErrorThread = null;
+ return;
+ }
+ synchronized(checkDiskErrorMutex) {
+ lastDiskErrorCheck = System.currentTimeMillis();
+ }
+ }
+ try {
+ Thread.sleep(checkDiskErrorInterval);
+ } catch (InterruptedException e) {
+ LOG.debug("InterruptedException in check disk error thread", e);
+ checkDiskErrorThread = null;
+ return;
+ }
+ }
+ }
+ });
+ }
+
+ public long getLastDiskErrorCheck() {
+ synchronized(checkDiskErrorMutex) {
+ return lastDiskErrorCheck;
+ }
+ }
+}
\ No newline at end of file
Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Mon May 12 19:08:06 2014
@@ -18,16 +18,13 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ClosedChannelException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -201,15 +198,23 @@ public class TestDiskError {
}
}
+ /**
+ * Checks whether {@link DataNode#checkDiskError()} is being called or not.
+ * Before refactoring the code the above function was not getting called
+ * @throws IOException, InterruptedException
+ */
@Test
- public void testNetworkErrorsIgnored() {
- DataNode dn = cluster.getDataNodes().iterator().next();
-
- assertTrue(dn.isNetworkRelatedException(new SocketException()));
- assertTrue(dn.isNetworkRelatedException(new SocketTimeoutException()));
- assertTrue(dn.isNetworkRelatedException(new ClosedChannelException()));
- assertTrue(dn.isNetworkRelatedException(new Exception("Broken pipe foo bar")));
- assertFalse(dn.isNetworkRelatedException(new Exception()));
- assertFalse(dn.isNetworkRelatedException(new Exception("random problem")));
+ public void testcheckDiskError() throws IOException, InterruptedException {
+ if(cluster.getDataNodes().size() <= 0) {
+ cluster.startDataNodes(conf, 1, true, null, null);
+ cluster.waitActive();
+ }
+ DataNode dataNode = cluster.getDataNodes().get(0);
+ long slackTime = dataNode.checkDiskErrorInterval/2;
+ //checking for disk error
+ dataNode.checkDiskError();
+ Thread.sleep(dataNode.checkDiskErrorInterval);
+ long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
+ assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((System.currentTimeMillis()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
}
}