You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by ki...@apache.org on 2014/05/12 21:08:07 UTC

svn commit: r1594055 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/test/java/org/apache/hadoop/hdfs/server/datanode/

Author: kihwal
Date: Mon May 12 19:08:06 2014
New Revision: 1594055

URL: http://svn.apache.org/r1594055
Log:
HDFS-5522. Datanode disk error check may be incorrectly skipped. Contributed by Rushabh Shah.

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Mon May 12 19:08:06 2014
@@ -452,6 +452,9 @@ Release 2.5.0 - UNRELEASED
     HDFS-6351. Command hdfs dfs -rm -r can't remove empty directory.
     (Yongjun Zhang via wang)
 
+    HDFS-5522. Datanode disk error check may be incorrectly skipped.
+    (Rushabh S Shah via kihwal)
+
 Release 2.4.1 - UNRELEASED
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Mon May 12 19:08:06 2014
@@ -248,7 +248,7 @@ class BlockReceiver implements Closeable
       
       if (cause != null) { // possible disk error
         ioe = cause;
-        datanode.checkDiskError(ioe); // may throw an exception here
+        datanode.checkDiskError();
       }
       
       throw ioe;
@@ -324,7 +324,7 @@ class BlockReceiver implements Closeable
     }
     // disk check
     if(ioe != null) {
-      datanode.checkDiskError(ioe);
+      datanode.checkDiskError();
       throw ioe;
     }
   }
@@ -615,7 +615,7 @@ class BlockReceiver implements Closeable
           manageWriterOsCache(offsetInBlock);
         }
       } catch (IOException iex) {
-        datanode.checkDiskError(iex);
+        datanode.checkDiskError();
         throw iex;
       }
     }
@@ -1171,11 +1171,7 @@ class BlockReceiver implements Closeable
         } catch (IOException e) {
           LOG.warn("IOException in BlockReceiver.run(): ", e);
           if (running) {
-            try {
-              datanode.checkDiskError(e); // may throw an exception here
-            } catch (IOException ioe) {
-              LOG.warn("DataNode.checkDiskError failed in run() with: ", ioe);
-            }
+            datanode.checkDiskError();
             LOG.info(myString, e);
             running = false;
             if (!Thread.interrupted()) { // failure not caused by interruption

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java Mon May 12 19:08:06 2014
@@ -84,7 +84,6 @@ import org.apache.hadoop.security.token.
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.*;
 import org.apache.hadoop.util.DiskChecker.DiskErrorException;
-import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
 import org.mortbay.util.ajax.JSON;
 
 import javax.management.ObjectName;
@@ -92,8 +91,6 @@ import javax.management.ObjectName;
 import java.io.*;
 import java.lang.management.ManagementFactory;
 import java.net.*;
-import java.nio.channels.ClosedByInterruptException;
-import java.nio.channels.ClosedChannelException;
 import java.nio.channels.SocketChannel;
 import java.security.PrivilegedExceptionAction;
 import java.util.*;
@@ -229,6 +226,11 @@ public class DataNode extends Configured
   ReadaheadPool readaheadPool;
   private final boolean getHdfsBlockLocationsEnabled;
   private ObjectName dataNodeInfoBeanName;
+  private Thread checkDiskErrorThread = null;
+  protected final int checkDiskErrorInterval = 5*1000;
+  private boolean checkDiskErrorFlag = false;
+  private Object checkDiskErrorMutex = new Object();
+  private long lastDiskErrorCheck;
 
   /**
    * Create the DataNode given a configuration, an array of dataDirs,
@@ -238,6 +240,7 @@ public class DataNode extends Configured
            final List<StorageLocation> dataDirs,
            final SecureResources resources) throws IOException {
     super(conf);
+    this.lastDiskErrorCheck = 0;
     this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
         DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT);
 
@@ -1212,6 +1215,11 @@ public class DataNode extends Configured
       this.dataXceiverServer.interrupt();
     }
 
+    // Interrupt the checkDiskErrorThread and terminate it.
+    if(this.checkDiskErrorThread != null) {
+      this.checkDiskErrorThread.interrupt();
+    }
+    
     // Record the time of initial notification
     long timeNotified = Time.now();
 
@@ -1321,55 +1329,17 @@ public class DataNode extends Configured
   }
   
   
-  /** Check if there is no space in disk 
-   *  @param e that caused this checkDiskError call
-   **/
-  protected void checkDiskError(Exception e ) throws IOException {
-    
-    LOG.warn("checkDiskError: exception: ", e);
-    if (isNetworkRelatedException(e)) {
-      LOG.info("Not checking disk as checkDiskError was called on a network" +
-      		" related exception");	
-      return;
-    }
-    if (e.getMessage() != null &&
-        e.getMessage().startsWith("No space left on device")) {
-      throw new DiskOutOfSpaceException("No space left on device");
-    } else {
-      checkDiskError();
-    }
-  }
-  
-  /**
-   * Check if the provided exception looks like it's from a network error
-   * @param e the exception from a checkDiskError call
-   * @return true if this exception is network related, false otherwise
-   */
-  protected boolean isNetworkRelatedException(Exception e) {
-    if (e instanceof SocketException 
-        || e instanceof SocketTimeoutException
-        || e instanceof ClosedChannelException 
-        || e instanceof ClosedByInterruptException) {
-      return true;
-    }
-    
-    String msg = e.getMessage();
-    
-    return null != msg 
-        && (msg.startsWith("An established connection was aborted")
-            || msg.startsWith("Broken pipe")
-            || msg.startsWith("Connection reset")
-            || msg.contains("java.nio.channels.SocketChannel"));
-  }
-  
   /**
    *  Check if there is a disk failure and if so, handle the error
    */
   public void checkDiskError() {
-    try {
-      data.checkDataDir();
-    } catch (DiskErrorException de) {
-      handleDiskError(de.getMessage());
+    synchronized(checkDiskErrorMutex) {
+      checkDiskErrorFlag = true;
+      if(checkDiskErrorThread == null) {
+        startCheckDiskErrorThread();
+        checkDiskErrorThread.start();
+        LOG.info("Starting CheckDiskError Thread");
+      }
     }
   }
   
@@ -1669,13 +1639,8 @@ public class DataNode extends Configured
       } catch (IOException ie) {
         LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
             targets[0] + " got ", ie);
-          // check if there are any disk problem
-        try{
-          checkDiskError(ie);
-        } catch(IOException e) {
-            LOG.warn("DataNode.checkDiskError failed in run() with: ", e);
-        }
-        
+        // check if there are any disk problem
+        checkDiskError();
       } finally {
         xmitsInProgress.getAndDecrement();
         IOUtils.closeStream(blockSender);
@@ -2590,4 +2555,50 @@ public class DataNode extends Configured
   public ShortCircuitRegistry getShortCircuitRegistry() {
     return shortCircuitRegistry;
   }
-}
+  
+  /**
+   * Starts a new thread which will check for disk error check request 
+   * every 5 sec
+   */
+  private void startCheckDiskErrorThread() {
+    checkDiskErrorThread = new Thread(new Runnable() {
+          @Override
+          public void run() {
+            while(shouldRun) {
+              boolean tempFlag ;
+              synchronized(checkDiskErrorMutex) {
+                tempFlag = checkDiskErrorFlag;
+                checkDiskErrorFlag = false;
+              }
+              if(tempFlag) {
+                try {
+                  data.checkDataDir();
+                } catch (DiskErrorException de) {
+                  handleDiskError(de.getMessage());
+                } catch (Exception e) {
+                  LOG.warn("Unexpected exception occurred while checking disk error  " + e);
+                  checkDiskErrorThread = null;
+                  return;
+                }
+                synchronized(checkDiskErrorMutex) {
+                  lastDiskErrorCheck = System.currentTimeMillis();
+                }
+              }
+              try {
+                Thread.sleep(checkDiskErrorInterval);
+              } catch (InterruptedException e) {
+                LOG.debug("InterruptedException in check disk error thread", e);
+                checkDiskErrorThread = null;
+                return;
+              }
+            }
+          }
+    });
+  }
+  
+  public long getLastDiskErrorCheck() {
+    synchronized(checkDiskErrorMutex) {
+      return lastDiskErrorCheck;
+    }
+  }
+}
\ No newline at end of file

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java?rev=1594055&r1=1594054&r2=1594055&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDiskError.java Mon May 12 19:08:06 2014
@@ -18,16 +18,13 @@
 package org.apache.hadoop.hdfs.server.datanode;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 
 import java.io.DataOutputStream;
 import java.io.File;
+import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.net.Socket;
-import java.net.SocketException;
-import java.net.SocketTimeoutException;
-import java.nio.channels.ClosedChannelException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -201,15 +198,23 @@ public class TestDiskError {
     }
   }
   
+  /**
+   * Checks whether {@link DataNode#checkDiskError()} is being called or not.
+   * Before refactoring the code the above function was not getting called 
+   * @throws IOException, InterruptedException
+   */
   @Test
-  public void testNetworkErrorsIgnored() {
-    DataNode dn = cluster.getDataNodes().iterator().next();
-    
-    assertTrue(dn.isNetworkRelatedException(new SocketException()));
-    assertTrue(dn.isNetworkRelatedException(new SocketTimeoutException()));
-    assertTrue(dn.isNetworkRelatedException(new ClosedChannelException()));
-    assertTrue(dn.isNetworkRelatedException(new Exception("Broken pipe foo bar")));
-    assertFalse(dn.isNetworkRelatedException(new Exception()));
-    assertFalse(dn.isNetworkRelatedException(new Exception("random problem")));
+  public void testcheckDiskError() throws IOException, InterruptedException {
+    if(cluster.getDataNodes().size() <= 0) {
+      cluster.startDataNodes(conf, 1, true, null, null);
+      cluster.waitActive();
+    }
+    DataNode dataNode = cluster.getDataNodes().get(0);
+    long slackTime = dataNode.checkDiskErrorInterval/2;
+    //checking for disk error
+    dataNode.checkDiskError();
+    Thread.sleep(dataNode.checkDiskErrorInterval);
+    long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
+    assertTrue("Disk Error check is not performed within  " + dataNode.checkDiskErrorInterval +  "  ms", ((System.currentTimeMillis()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
   }
 }