You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by we...@apache.org on 2019/08/30 01:31:21 UTC
[hadoop] branch branch-3.1 updated: HDFS-14706. Checksums are not
checked if block meta file is less than 7 bytes. Contributed by Stephen
O'Donnell.
This is an automated email from the ASF dual-hosted git repository.
weichiu pushed a commit to branch branch-3.1
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.1 by this push:
new d42b48d HDFS-14706. Checksums are not checked if block meta file is less than 7 bytes. Contributed by Stephen O'Donnell.
d42b48d is described below
commit d42b48d4d0aa08f7570c0f3a80e7fbf58833a6b2
Author: Stephen O'Donnell <st...@gmail.com>
AuthorDate: Thu Aug 29 17:37:05 2019 -0700
HDFS-14706. Checksums are not checked if block meta file is less than 7 bytes. Contributed by Stephen O'Donnell.
Signed-off-by: Wei-Chiu Chuang <we...@apache.org>
(cherry picked from commit 7bebad61d9c3dbff81fdcf243585fd3e9ae59dde)
(cherry picked from commit 9c0d6e16573a055665318bce8f7d38a92c897393)
---
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 72 +++++++++++++++++++++-
.../hadoop/hdfs/client/HdfsClientConfigKeys.java | 3 +
.../apache/hadoop/hdfs/TestDFSOutputStream.java | 37 +++++++++++
3 files changed, 110 insertions(+), 2 deletions(-)
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index e977054..12a0aa4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -23,6 +23,7 @@ import java.io.InterruptedIOException;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.EnumSet;
+import java.util.Random;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.HadoopIllegalArgumentException;
@@ -126,6 +127,8 @@ public class DFSOutputStream extends FSOutputSummer
protected final AtomicReference<CachingStrategy> cachingStrategy;
private FileEncryptionInfo fileEncryptionInfo;
private int writePacketSize;
+ private boolean leaseRecovered = false;
+ private boolean exceptionInClose = false; //for unit test
/** Use {@link ByteArrayManager} to create buffer for non-heartbeat packets.*/
protected DFSPacket createPacket(int packetSize, int chunksPerPkt,
@@ -832,6 +835,39 @@ public class DFSOutputStream extends FSOutputSummer
}
}
+ @VisibleForTesting
+ public void setExceptionInClose(boolean enable) {
+ exceptionInClose = enable;
+ }
+
+ private class EmulateExceptionInClose {
+ private Random rand = null;
+ private int kickedNum;
+
+ EmulateExceptionInClose(int callNum) {
+ if (exceptionInClose) {
+ rand = new Random();
+ }
+ kickedNum = callNum;
+ }
+
+ void kickRandomException() throws IOException {
+ if (exceptionInClose) {
+ if (kickedNum > 0) {
+ if (rand.nextInt(kickedNum) == 1) {
+ throw new IOException("Emulated random IOException in close");
+ }
+ }
+ }
+ }
+
+ void kickException() throws IOException {
+ if (exceptionInClose) {
+ throw new IOException("Emulated IOException in close");
+ }
+ }
+ }
+
/**
* Closes this output stream and releases any system
* resources associated with this stream.
@@ -854,7 +890,20 @@ public class DFSOutputStream extends FSOutputSummer
}
protected synchronized void closeImpl() throws IOException {
+ boolean recoverOnCloseException = dfsClient.getConfiguration().getBoolean(
+ HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY,
+ HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_DEFAULT);
if (isClosed()) {
+ if (recoverOnCloseException && !leaseRecovered) {
+ try {
+ dfsClient.endFileLease(fileId);
+ dfsClient.recoverLease(src);
+ leaseRecovered = true;
+ } catch (Exception e) {
+ LOG.warn("Fail to recover lease for {}", src, e);
+ }
+ }
+
LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
closed, getStreamer().streamerClosed());
try {
@@ -871,8 +920,11 @@ public class DFSOutputStream extends FSOutputSummer
return;
}
+ EmulateExceptionInClose eei = new EmulateExceptionInClose(5);
try {
- flushBuffer(); // flush from all upper layers
+ flushBuffer(); // flush from all upper layers
+ // for test
+ eei.kickRandomException();
if (currentPacket != null) {
enqueueCurrentPacket();
@@ -883,12 +935,28 @@ public class DFSOutputStream extends FSOutputSummer
}
try {
- flushInternal(); // flush all data to Datanodes
+ flushInternal(); // flush all data to Datanodes
} catch (IOException ioe) {
cleanupAndRethrowIOException(ioe);
}
+ // for test
+ eei.kickRandomException();
completeFile();
+ // for test
+ eei.kickException();
} catch (ClosedChannelException ignored) {
+ } catch (IOException ioe) {
+ if (recoverOnCloseException) {
+ try {
+ dfsClient.endFileLease(fileId);
+ dfsClient.recoverLease(src);
+ leaseRecovered = true;
+ } catch (Exception e) {
+ // Ignore exception rendered by recoverLease. Throw original
+ // exception
+ }
+ }
+ throw ioe;
} finally {
// Failures may happen when flushing data.
// Streamers may keep waiting for the new block information.
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
index 6fd8955..49b1718 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/HdfsClientConfigKeys.java
@@ -299,6 +299,9 @@ public interface HdfsClientConfigKeys {
String EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_KEY =
PREFIX + "exclude.nodes.cache.expiry.interval.millis";
long EXCLUDE_NODES_CACHE_EXPIRY_INTERVAL_DEFAULT = 10*MINUTE;
+ String RECOVER_ON_CLOSE_EXCEPTION_KEY =
+ PREFIX + "recover.on.close.exception";
+ boolean RECOVER_ON_CLOSE_EXCEPTION_DEFAULT = false;
interface ByteArrayManager {
String PREFIX = Write.PREFIX + "byte-array-manager.";
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
index 1afa452..a262706 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSOutputStream.java
@@ -31,6 +31,7 @@ import java.util.LinkedList;
import java.util.Map;
import java.util.Random;
+import org.apache.commons.lang3.reflect.FieldUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -40,6 +41,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities.StreamCapability;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DataStreamer.LastExceptionInStreamer;
+import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.client.impl.DfsClientConf;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
@@ -371,6 +373,41 @@ public class TestDFSOutputStream {
os.close();
}
+ /**
+ * If dfs.client.recover-on-close-exception.enable is set and exception
+ * happens in close, the local lease should be closed and lease in namenode
+ * should be recovered.
+ */
+ @Test
+ public void testExceptionInClose() throws Exception {
+ String testStr = "Test exception in close";
+ DistributedFileSystem fs = cluster.getFileSystem();
+ Path testFile = new Path("/closeexception");
+ fs.getConf().setBoolean(
+ HdfsClientConfigKeys.Write.RECOVER_ON_CLOSE_EXCEPTION_KEY, true);
+ FSDataOutputStream os = fs.create(testFile);
+ DFSOutputStream dos =
+ (DFSOutputStream) FieldUtils.readField(os, "wrappedStream", true);
+ dos.setExceptionInClose(true);
+ os.write(testStr.getBytes());
+ try {
+ dos.close();
+ // There should be exception
+ Assert.assertTrue(false);
+ } catch (IOException ioe) {
+ GenericTestUtils.waitFor(() -> {
+ boolean closed;
+ try {
+ closed = fs.isFileClosed(testFile);
+ } catch (IOException e) {
+ return false;
+ }
+ return closed;
+ }, 1000, 5000);
+ Assert.assertTrue(fs.isFileClosed(testFile));
+ }
+ }
+
@AfterClass
public static void tearDown() {
if (cluster != null) {
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org