You are viewing a plain text version of this content. The canonical link for it is here.
Posted to hdfs-commits@hadoop.apache.org by um...@apache.org on 2012/07/08 19:42:19 UTC

svn commit: r1358794 - in /hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs: ./ src/main/java/org/apache/hadoop/hdfs/server/datanode/ src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/ src/test/java/org/apache/hadoop/hdfs/server/da...

Author: umamahesh
Date: Sun Jul  8 17:42:19 2012
New Revision: 1358794

URL: http://svn.apache.org/viewvc?rev=1358794&view=rev
Log:
HDFS-3541. Deadlock between recovery, xceiver and packet responder. Contributed by Vinay.

Submitted by:	Vinay
Reviewed by:	Uma Maheswara Rao G

Modified:
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
    hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt?rev=1358794&r1=1358793&r2=1358794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt Sun Jul  8 17:42:19 2012
@@ -175,6 +175,8 @@ Trunk (unreleased changes)
 
     HDFS-3549. Fix dist tar build fails in hadoop-hdfs-raid project. (Jason Lowe via daryn)
 
+    HDFS-3541. Deadlock between recovery, xceiver and packet responder (Vinay via umamahesh)
+
 Branch-2 ( Unreleased changes )
  
   INCOMPATIBLE CHANGES

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java?rev=1358794&r1=1358793&r2=1358794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java Sun Jul  8 17:42:19 2012
@@ -844,6 +844,7 @@ class BlockReceiver implements Closeable
         try {
           responder.join();
         } catch (InterruptedException e) {
+          responder.interrupt();
           throw new IOException("Interrupted receiveBlock");
         }
         responder = null;
@@ -1018,6 +1019,7 @@ class BlockReceiver implements Closeable
           wait();
         } catch (InterruptedException e) {
           running = false;
+          Thread.currentThread().interrupt();
         }
       }
       if(LOG.isDebugEnabled()) {

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java?rev=1358794&r1=1358793&r2=1358794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java Sun Jul  8 17:42:19 2012
@@ -838,6 +838,10 @@ class FsDatasetImpl implements FsDataset
    */
   @Override // FsDatasetSpi
   public synchronized void finalizeBlock(ExtendedBlock b) throws IOException {
+    if (Thread.interrupted()) {
+      // Don't allow data modifications from interrupted threads
+      throw new IOException("Cannot finalize block from Interrupted Thread");
+    }
     ReplicaInfo replicaInfo = getReplicaInfo(b);
     if (replicaInfo.getState() == ReplicaState.FINALIZED) {
       // this is legal, when recovery happens on a file that has

Modified: hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java?rev=1358794&r1=1358793&r2=1358794&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java (original)
+++ hadoop/common/trunk/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java Sun Jul  8 17:42:19 2012
@@ -38,21 +38,27 @@ import java.net.InetSocketAddress;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.commons.logging.impl.Log4JLogger;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
+import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DFSConfigKeys;
 import org.apache.hadoop.hdfs.DFSTestUtil;
+import org.apache.hadoop.hdfs.DistributedFileSystem;
 import org.apache.hadoop.hdfs.HdfsConfiguration;
 import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.hdfs.MiniDFSNNTopology;
 import org.apache.hadoop.hdfs.protocol.DatanodeID;
 import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
 import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
+import org.apache.hadoop.hdfs.protocol.LocatedBlock;
 import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
 import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
 import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
@@ -561,4 +567,68 @@ public class TestBlockRecovery {
       streams.close();
     }
   }
+  
+  /**
+   * Test to verify the race between finalizeBlock and Lease recovery
+   * 
+   * @throws Exception
+   */
+  @Test(timeout = 20000)
+  public void testRaceBetweenReplicaRecoveryAndFinalizeBlock() throws Exception {
+    tearDown();// Stop the Mocked DN started in startup()
+
+    Configuration conf = new HdfsConfiguration();
+    MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
+        .nnTopology(MiniDFSNNTopology.simpleSingleNN(8020, 50070))
+        .numDataNodes(1).build();
+    try {
+      cluster.waitClusterUp();
+      DistributedFileSystem fs = cluster.getFileSystem();
+      Path path = new Path("/test");
+      FSDataOutputStream out = fs.create(path);
+      out.writeBytes("data");
+      out.hsync();
+      
+      List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(fs.open(path));
+      final LocatedBlock block = blocks.get(0);
+      final DataNode dataNode = cluster.getDataNodes().get(0);
+      
+      final AtomicBoolean recoveryInitResult = new AtomicBoolean(true);
+      Thread recoveryThread = new Thread() {
+        public void run() {
+          try {
+            DatanodeInfo[] locations = block.getLocations();
+            final RecoveringBlock recoveringBlock = new RecoveringBlock(
+                block.getBlock(), locations, block.getBlock()
+                    .getGenerationStamp() + 1);
+            synchronized (dataNode.data) {
+              Thread.sleep(2000);
+              dataNode.initReplicaRecovery(recoveringBlock);
+            }
+          } catch (Exception e) {
+            recoveryInitResult.set(false);
+          }
+        }
+      };
+      recoveryThread.start();
+      try {
+        out.close();
+      } catch (IOException e) {
+        Assert.assertTrue("Writing should fail",
+            e.getMessage().contains("are bad. Aborting..."));
+      } finally {
+        recoveryThread.join();
+      }
+      Assert.assertTrue("Recovery should be initiated successfully",
+          recoveryInitResult.get());
+      
+      dataNode.updateReplicaUnderRecovery(block.getBlock(), block.getBlock()
+          .getGenerationStamp() + 1, block.getBlockSize());
+    } finally {
+      if (null != cluster) {
+        cluster.shutdown();
+        cluster = null;
+      }
+    }
+  }
 }