You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xi...@apache.org on 2018/02/23 21:49:11 UTC
hadoop git commit: HDFS-13164. File not closed if streamer fail with
DSQuotaExceededException.
Repository: hadoop
Updated Branches:
refs/heads/trunk 8e728f39c -> 51088d323
HDFS-13164. File not closed if streamer fail with DSQuotaExceededException.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/51088d32
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/51088d32
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/51088d32
Branch: refs/heads/trunk
Commit: 51088d323359587dca7831f74c9d065c2fccc60d
Parents: 8e728f3
Author: Xiao Chen <xi...@apache.org>
Authored: Fri Feb 23 13:47:39 2018 -0800
Committer: Xiao Chen <xi...@apache.org>
Committed: Fri Feb 23 13:49:09 2018 -0800
----------------------------------------------------------------------
.../org/apache/hadoop/hdfs/DFSOutputStream.java | 63 +++++++++--
.../hadoop/hdfs/client/impl/LeaseRenewer.java | 2 +-
.../java/org/apache/hadoop/hdfs/TestQuota.java | 107 +++++++++++++++++++
3 files changed, 163 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51088d32/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
index 7849796..9734752 100755
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java
@@ -852,7 +852,19 @@ public class DFSOutputStream extends FSOutputSummer
protected synchronized void closeImpl() throws IOException {
if (isClosed()) {
- getStreamer().getLastException().check(true);
+ LOG.debug("Closing an already closed stream. [Stream:{}, streamer:{}]",
+ closed, getStreamer().streamerClosed());
+ try {
+ getStreamer().getLastException().check(true);
+ } catch (IOException ioe) {
+ cleanupAndRethrowIOException(ioe);
+ } finally {
+ if (!closed) {
+ // If stream is not closed but streamer closed, clean up the stream.
+ // Most importantly, end the file lease.
+ closeThreads(true);
+ }
+ }
return;
}
@@ -867,14 +879,12 @@ public class DFSOutputStream extends FSOutputSummer
setCurrentPacketToEmpty();
}
- flushInternal(); // flush all data to Datanodes
- // get last block before destroying the streamer
- ExtendedBlock lastBlock = getStreamer().getBlock();
-
- try (TraceScope ignored =
- dfsClient.getTracer().newScope("completeFile")) {
- completeFile(lastBlock);
+ try {
+ flushInternal(); // flush all data to Datanodes
+ } catch (IOException ioe) {
+ cleanupAndRethrowIOException(ioe);
}
+ completeFile();
} catch (ClosedChannelException ignored) {
} finally {
// Failures may happen when flushing data.
@@ -886,6 +896,43 @@ public class DFSOutputStream extends FSOutputSummer
}
}
+ private void completeFile() throws IOException {
+ // get last block before destroying the streamer
+ ExtendedBlock lastBlock = getStreamer().getBlock();
+ try (TraceScope ignored =
+ dfsClient.getTracer().newScope("completeFile")) {
+ completeFile(lastBlock);
+ }
+ }
+
+ /**
+ * Determines whether an IOException thrown needs extra cleanup on the stream.
+ * Space quota exceptions will be thrown when getting new blocks, so the
+ * open HDFS file need to be closed.
+ *
+ * @param ioe the IOException
+ * @return whether the stream needs cleanup for the given IOException
+ */
+ private boolean exceptionNeedsCleanup(IOException ioe) {
+ return ioe instanceof DSQuotaExceededException
+ || ioe instanceof QuotaByStorageTypeExceededException;
+ }
+
+ private void cleanupAndRethrowIOException(IOException ioe)
+ throws IOException {
+ if (exceptionNeedsCleanup(ioe)) {
+ final MultipleIOException.Builder b = new MultipleIOException.Builder();
+ b.add(ioe);
+ try {
+ completeFile();
+ } catch (IOException e) {
+ b.add(e);
+ throw b.build();
+ }
+ }
+ throw ioe;
+ }
+
// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
protected void completeFile(ExtendedBlock last) throws IOException {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51088d32/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
index e33d024..957c0a9 100644
--- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/client/impl/LeaseRenewer.java
@@ -74,7 +74,7 @@ import org.slf4j.LoggerFactory;
*/
@InterfaceAudience.Private
public class LeaseRenewer {
- static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
+ public static final Logger LOG = LoggerFactory.getLogger(LeaseRenewer.class);
private static long leaseRenewerGraceDefault = 60*1000L;
static final long LEASE_RENEWER_SLEEP_DEFAULT = 1000L;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/51088d32/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
----------------------------------------------------------------------
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
index 88889f3..2924a74 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestQuota.java
@@ -35,6 +35,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.List;
import java.util.Scanner;
+import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -42,6 +43,7 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageType;
+import org.apache.hadoop.hdfs.client.impl.LeaseRenewer;
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.NSQuotaExceededException;
@@ -58,13 +60,20 @@ import org.apache.hadoop.util.ToolRunner;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Rule;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.collect.Lists;
+import org.junit.rules.Timeout;
+import org.slf4j.Logger;
+import org.slf4j.event.Level;
+import org.slf4j.LoggerFactory;
/** A class for testing quota-related commands */
public class TestQuota {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestQuota.class);
private static Configuration conf = null;
private static final ByteArrayOutputStream OUT_STREAM = new ByteArrayOutputStream();
@@ -77,6 +86,9 @@ public class TestQuota {
/* set a smaller block size so that we can test with smaller space quotas */
private static final int DEFAULT_BLOCK_SIZE = 512;
+ @Rule
+ public final Timeout testTestout = new Timeout(120000);
+
@BeforeClass
public static void setUpClass() throws Exception {
conf = new HdfsConfiguration();
@@ -1479,6 +1491,101 @@ public class TestQuota {
"clrSpaceQuota");
}
+ @Test
+ public void testSpaceQuotaExceptionOnClose() throws Exception {
+ GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
+ GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
+ final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+ final Path dir = new Path(PathUtils.getTestPath(getClass()),
+ GenericTestUtils.getMethodName());
+ assertTrue(dfs.mkdirs(dir));
+ final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
+ assertEquals(0, ToolRunner.run(dfsAdmin, args));
+
+ final Path testFile = new Path(dir, "file");
+ final FSDataOutputStream stream = dfs.create(testFile);
+ stream.write("whatever".getBytes());
+ try {
+ stream.close();
+ fail("close should fail");
+ } catch (DSQuotaExceededException expected) {
+ }
+
+ assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+ }
+
+ @Test
+ public void testSpaceQuotaExceptionOnFlush() throws Exception {
+ GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
+ GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
+ GenericTestUtils.setLogLevel(DFSClient.LOG, Level.TRACE);
+ final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+ final Path dir = new Path(PathUtils.getTestPath(getClass()),
+ GenericTestUtils.getMethodName());
+ assertTrue(dfs.mkdirs(dir));
+ final String[] args = new String[] {"-setSpaceQuota", "1", dir.toString()};
+ assertEquals(0, ToolRunner.run(dfsAdmin, args));
+
+ Path testFile = new Path(dir, "file");
+ FSDataOutputStream stream = dfs.create(testFile);
+ // get the lease renewer now so we can verify it later without calling
+ // getLeaseRenewer, which will automatically add the client into it.
+ final LeaseRenewer leaseRenewer = dfs.getClient().getLeaseRenewer();
+ stream.write("whatever".getBytes());
+ try {
+ stream.hflush();
+ fail("flush should fail");
+ } catch (DSQuotaExceededException expected) {
+ }
+ // even if we close the stream in finially, it won't help.
+ try {
+ stream.close();
+ fail("close should fail too");
+ } catch (DSQuotaExceededException expected) {
+ }
+
+ GenericTestUtils.setLogLevel(LeaseRenewer.LOG, Level.TRACE);
+ GenericTestUtils.waitFor(new Supplier<Boolean>() {
+ @Override
+ public Boolean get() {
+ LOG.info("LeaseRenewer: {}", leaseRenewer);
+ return leaseRenewer.isEmpty();
+ }
+ }, 100, 10000);
+ assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+ }
+
+ @Test
+ public void testSpaceQuotaExceptionOnAppend() throws Exception {
+ GenericTestUtils.setLogLevel(DFSOutputStream.LOG, Level.TRACE);
+ GenericTestUtils.setLogLevel(DataStreamer.LOG, Level.TRACE);
+ final DFSAdmin dfsAdmin = new DFSAdmin(conf);
+ final Path dir = new Path(PathUtils.getTestPath(getClass()),
+ GenericTestUtils.getMethodName());
+ dfs.delete(dir, true);
+ assertTrue(dfs.mkdirs(dir));
+ final String[] args =
+ new String[] {"-setSpaceQuota", "4000", dir.toString()};
+ ToolRunner.run(dfsAdmin, args);
+
+ final Path testFile = new Path(dir, "file");
+ OutputStream stream = dfs.create(testFile);
+ stream.write("whatever".getBytes());
+ stream.close();
+
+ assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+
+ stream = dfs.append(testFile);
+ byte[] buf = AppendTestUtil.initBuffer(4096);
+ stream.write(buf);
+ try {
+ stream.close();
+ fail("close after append should fail");
+ } catch (DSQuotaExceededException expected) {
+ }
+ assertEquals(0, cluster.getNamesystem().getNumFilesUnderConstruction());
+ }
+
private void testSetAndClearSpaceQuotaNoAccessInternal(
final String[] args,
final int cmdRet,
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org