You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2020/09/09 12:09:48 UTC

[hadoop] branch trunk updated: HDFS-14694. Call recoverLease on DFSOutputStream close exception. Contributed by Lisheng Sun.

This is an automated email from the ASF dual-hosted git repository.

hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 1d6d0d8  HDFS-14694. Call recoverLease on DFSOutputStream close exception. Contributed by Lisheng Sun.
1d6d0d8 is described below

commit 1d6d0d820786151ea4565ce8b3f23d7b38e83a8a
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Wed Sep 9 19:50:06 2020 +0800

    HDFS-14694. Call recoverLease on DFSOutputStream close exception. Contributed by Lisheng Sun.
    
    Co-authored-by: Chen Zhang <ch...@gmail.com>
    
    Signed-off-by: He Xiaoqiao <he...@apache.org>
    Reviewed-by: Ayush Saxena <ay...@apache.org>
---
 .../org/apache/hadoop/hdfs/DFSOutputStream.java    | 36 ++++++++++-
 .../hadoop/hdfs/client/HdfsClientConfigKeys.java   |  3 +
 .../apache/hadoop/hdfs/TestDFSOutputStream.java    | 69 ++++++++++++++++++++++
 3 files changed, 107 insertions(+), 1 deletion(-)

diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index a9e44cd..42c1c85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -75,6 +75,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
 
 /****************************************************************
  * DFSOutputStream creates files from a stream of bytes.
@@ -126,6 +128,7 @@ public class DFSOutputStream extends FSOutputSummer
   protected final AtomicReference<CachingStrategy> cachingStrategy;
   private FileEncryptionInfo fileEncryptionInfo;
   private int writePacketSize;
+  private boolean leaseRecovered = false;
 
   /** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
   protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@@ -861,7 +864,14 @@ public class DFSOutputStream extends FSOutputSummer
   }
 
   protected synchronized void closeImpl() throws IOException {
+    boolean recoverLeaseOnCloseException = dfsClient.getConfiguration()
+        .getBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY,
+            RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT);
     if (isClosed()) {
+      if (!leaseRecovered) {
+        recoverLease(recoverLeaseOnCloseException);
+      }
+
       LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
           closed, getStreamer().streamerClosed());
       try {
@@ -896,6 +906,9 @@ public class DFSOutputStream extends FSOutputSummer
       }
       completeFile();
     } catch (ClosedChannelException ignored) {
+    } catch (IOException ioe) {
+      recoverLease(recoverLeaseOnCloseException);
+      throw ioe;
     } finally {
       // Failures may happen when flushing data.
       // Streamers may keep waiting for the new block information.
@@ -906,7 +919,23 @@ public class DFSOutputStream extends FSOutputSummer
     }
   }
 
-  private void completeFile() throws IOException {
+  /**
+   * If recoverLeaseOnCloseException is true and an exception occurs when
+   * closing a file, recover lease.
+   */
+  private void recoverLease(boolean recoverLeaseOnCloseException) {
+    if (recoverLeaseOnCloseException) {
+      try {
+        dfsClient.endFileLease(fileId);
+        dfsClient.recoverLease(src);
+        leaseRecovered = true;
+      } catch (Exception e) {
+        LOG.warn("Fail to recover lease for {}", src, e);
+      }
+    }
+  }
+
+  void completeFile() throws IOException {
     // get last block before destroying the streamer
     ExtendedBlock lastBlock = getStreamer().getBlock();
     try (TraceScope ignored =
@@ -1076,6 +1105,11 @@ public class DFSOutputStream extends FSOutputSummer
     return getClass().getSimpleName() + ":" + streamer;
   }
 
+  @VisibleForTesting
+  boolean isLeaseRecovered() {
+    return leaseRecovered;
+  }
+
   static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
       DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
       String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index e8b5402..8561cab 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -362,6 +362,9 @@ public interface HdfsClientConfigKeys {
     String  EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
         PREFIX + "exclude.nodes.cache.expiry.interval.millis";
     long    EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
+    String RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY =
+        PREFIX + "recover.lease.on.close.exception";
+    boolean RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT = false;
 
     interface ByteArrayManager {
       String PREFIX = Write.PREFIX + "byte-array-manager.";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 1891956..e263a8e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -30,6 +30,7 @@ import java.util.EnumSet;
 import java.util.LinkedList;
 import java.util.Map;
 import java.util.Random;
+import java.util.concurrent.TimeoutException;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CreateFlag;
@@ -62,7 +63,10 @@ import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.anyBoolean;
 import static org.mockito.ArgumentMatchers.anyLong;
 import org.mockito.Mockito;
@@ -371,10 +375,75 @@ public class TestDFSOutputStream {
     os.close();
   }
 
+  @Test
+  public void testExceptionInCloseWithRecoverLease() throws Exception {
+    Configuration conf = new Configuration();
+    conf.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true);
+    DFSClient client =
+        new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
+    DFSClient spyClient = Mockito.spy(client);
+    DFSOutputStream dfsOutputStream = spyClient.create(
+        "/testExceptionInCloseWithRecoverLease", FsPermission.getFileDefault(),
+        EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null, 1024, null);
+    DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
+    doThrow(new IOException("Emulated IOException in close"))
+        .when(spyDFSOutputStream).completeFile();
+    try {
+      spyDFSOutputStream.close();
+      fail();
+    } catch (IOException ioe) {
+      assertTrue(spyDFSOutputStream.isLeaseRecovered());
+      waitForFileClosed("/testExceptionInCloseWithRecoverLease");
+      assertTrue(isFileClosed("/testExceptionInCloseWithRecoverLease"));
+    }
+  }
+
+  @Test
+  public void testExceptionInCloseWithoutRecoverLease() throws Exception {
+    Configuration conf = new Configuration();
+    DFSClient client =
+        new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
+    DFSClient spyClient = Mockito.spy(client);
+    DFSOutputStream dfsOutputStream =
+        spyClient.create("/testExceptionInCloseWithoutRecoverLease",
+            FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE),
+            (short) 3, 1024, null, 1024, null);
+    DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
+    doThrow(new IOException("Emulated IOException in close"))
+        .when(spyDFSOutputStream).completeFile();
+    try {
+      spyDFSOutputStream.close();
+      fail();
+    } catch (IOException ioe) {
+      assertFalse(spyDFSOutputStream.isLeaseRecovered());
+      try {
+        waitForFileClosed("/testExceptionInCloseWithoutRecoverLease");
+      } catch (TimeoutException e) {
+        assertFalse(isFileClosed("/testExceptionInCloseWithoutRecoverLease"));
+      }
+    }
+  }
+
   @AfterClass
   public static void tearDown() {
     if (cluster != null) {
       cluster.shutdown();
     }
   }
+
+  private boolean isFileClosed(String path) throws IOException {
+    return cluster.getFileSystem().isFileClosed(new Path(path));
+  }
+
+  private void waitForFileClosed(String path) throws Exception {
+    GenericTestUtils.waitFor(() -> {
+      boolean closed;
+      try {
+        closed = isFileClosed(path);
+      } catch (IOException e) {
+        return false;
+      }
+      return closed;
+    }, 1000, 5000);
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org