You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by he...@apache.org on 2020/09/09 12:09:48 UTC
[hadoop] branch trunk updated: HDFS-14694. Call recoverLease on
DFSOutputStream close exception. Contributed by Lisheng Sun.
This is an automated email from the ASF dual-hosted git repository.
hexiaoqiao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 1d6d0d8 HDFS-14694. Call recoverLease on DFSOutputStream close exception. Contributed by Lisheng Sun.
1d6d0d8 is described below
commit 1d6d0d820786151ea4565ce8b3f23d7b38e83a8a
Author: He Xiaoqiao <he...@apache.org>
AuthorDate: Wed Sep 9 19:50:06 2020 +0800
HDFS-14694. Call recoverLease on DFSOutputStream close exception. Contributed by Lisheng Sun.
Co-authored-by: Chen Zhang <ch...@gmail.com>
Signed-off-by: He Xiaoqiao <he...@apache.org>
Reviewed-by: Ayush Saxena <ay...@apache.org>
---
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 36 ++++++++++-
.../hadoop/hdfs/client/HdfsClientConfigKeys.java | 3 +
.../apache/hadoop/hdfs/TestDFSOutputStream.java | 69 ++++++++++++++++++++++
3 files changed, 107 insertions(+), 1 deletion(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index a9e44cd..42c1c85 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -75,6 +75,8 @@ import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
/****************************************************************
* DFSOutputStream creates files from a stream of bytes.
@@ -126,6 +128,7 @@ public class DFSOutputStream extends FSOutputSummer
protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize;
+ private boolean leaseRecovered = false;
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@@ -861,7 +864,14 @@ public class DFSOutputStream extends FSOutputSummer
}
protected synchronized void closeImpl() throws IOException {
+ boolean recoverLeaseOnCloseException = dfsClient.getConfiguration()
+ .getBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY,
+ RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT);
if (isClosed()) {
+ if (!leaseRecovered) {
+ recoverLease(recoverLeaseOnCloseException);
+ }
+
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
closed, getStreamer().streamerClosed());
try {
@@ -896,6 +906,9 @@ public class DFSOutputStream extends FSOutputSummer
}
completeFile();
} catch (ClosedChannelException ignored) {
+ } catch (IOException ioe) {
+ recoverLease(recoverLeaseOnCloseException);
+ throw ioe;
} finally {
// Failures may happen when flushing data.
// Streamers may keep waiting for the new block information.
@@ -906,7 +919,23 @@ public class DFSOutputStream extends FSOutputSummer
}
}
- private void completeFile() throws IOException {
+ /**
+ * If recoverLeaseOnCloseException is true and an exception occurs when
+ * closing a file, recover lease.
+ */
+ private void recoverLease(boolean recoverLeaseOnCloseException) {
+ if (recoverLeaseOnCloseException) {
+ try {
+ dfsClient.endFileLease(fileId);
+ dfsClient.recoverLease(src);
+ leaseRecovered = true;
+ } catch (Exception e) {
+ LOG.warn("Fail to recover lease for {}", src, e);
+ }
+ }
+ }
+
+ void completeFile() throws IOException {
// get last block before destroying the streamer
ExtendedBlock lastBlock = getStreamer().getBlock();
try (TraceScope ignored =
@@ -1076,6 +1105,11 @@ public class DFSOutputStream extends FSOutputSummer
return getClass().getSimpleName() + ":" + streamer;
}
+ @VisibleForTesting
+ boolean isLeaseRecovered() {
+ return leaseRecovered;
+ }
+
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index e8b5402..8561cab 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -362,6 +362,9 @@ public interface HdfsClientConfigKeys {
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
PREFIX + "exclude.nodes.cache.expiry.interval.millis";
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
+ String RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY =
+ PREFIX + "recover.lease.on.close.exception";
+ boolean RECOVER_LEASE_ON_CLOSE_EXCEPTION_DEFAULT = false;
interface ByteArrayManager {
String PREFIX = Write.PREFIX + "byte-array-manager.";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 1891956..e263a8e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -30,6 +30,7 @@ import java.util.EnumSet;
import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
@@ -62,7 +63,10 @@ import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
+import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.Write.RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyLong;
import org.mockito.Mockito;
@@ -371,10 +375,75 @@ public class TestDFSOutputStream {
os.close();
}
+ @Test
+ public void testExceptionInCloseWithRecoverLease() throws Exception {
+ Configuration conf = new Configuration();
+ conf.setBoolean(RECOVER_LEASE_ON_CLOSE_EXCEPTION_KEY, true);
+ DFSClient client =
+ new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
+ DFSClient spyClient = Mockito.spy(client);
+ DFSOutputStream dfsOutputStream = spyClient.create(
+ "/testExceptionInCloseWithRecoverLease", FsPermission.getFileDefault(),
+ EnumSet.of(CreateFlag.CREATE), (short) 3, 1024, null, 1024, null);
+ DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
+ doThrow(new IOException("Emulated IOException in close"))
+ .when(spyDFSOutputStream).completeFile();
+ try {
+ spyDFSOutputStream.close();
+ fail();
+ } catch (IOException ioe) {
+ assertTrue(spyDFSOutputStream.isLeaseRecovered());
+ waitForFileClosed("/testExceptionInCloseWithRecoverLease");
+ assertTrue(isFileClosed("/testExceptionInCloseWithRecoverLease"));
+ }
+ }
+
+ @Test
+ public void testExceptionInCloseWithoutRecoverLease() throws Exception {
+ Configuration conf = new Configuration();
+ DFSClient client =
+ new DFSClient(cluster.getNameNode(0).getNameNodeAddress(), conf);
+ DFSClient spyClient = Mockito.spy(client);
+ DFSOutputStream dfsOutputStream =
+ spyClient.create("/testExceptionInCloseWithoutRecoverLease",
+ FsPermission.getFileDefault(), EnumSet.of(CreateFlag.CREATE),
+ (short) 3, 1024, null, 1024, null);
+ DFSOutputStream spyDFSOutputStream = Mockito.spy(dfsOutputStream);
+ doThrow(new IOException("Emulated IOException in close"))
+ .when(spyDFSOutputStream).completeFile();
+ try {
+ spyDFSOutputStream.close();
+ fail();
+ } catch (IOException ioe) {
+ assertFalse(spyDFSOutputStream.isLeaseRecovered());
+ try {
+ waitForFileClosed("/testExceptionInCloseWithoutRecoverLease");
+ } catch (TimeoutException e) {
+ assertFalse(isFileClosed("/testExceptionInCloseWithoutRecoverLease"));
+ }
+ }
+ }
+
@AfterClass
public static void tearDown() {
if (cluster != null) {
cluster.shutdown();
}
}
+
+ private boolean isFileClosed(String path) throws IOException {
+ return cluster.getFileSystem().isFileClosed(new Path(path));
+ }
+
+ private void waitForFileClosed(String path) throws Exception {
+ GenericTestUtils.waitFor(() -> {
+ boolean closed;
+ try {
+ closed = isFileClosed(path);
+ } catch (IOException e) {
+ return false;
+ }
+ return closed;
+ }, 1000, 5000);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org