You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2021/03/29 19:20:41 UTC
[hbase] branch branch-2.4 updated: HBASE-25692 Always try to close
the WAL reader when we catch any exception (#3090)
This is an automated email from the ASF dual-hosted git repository.
stack pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.4 by this push:
new 92f7be5 HBASE-25692 Always try to close the WAL reader when we catch any exception (#3090)
92f7be5 is described below
commit 92f7be5da7ea7dbd03572144d68d9697d10f5bb7
Author: Josh Elser <el...@apache.org>
AuthorDate: Mon Mar 29 15:15:58 2021 -0400
HBASE-25692 Always try to close the WAL reader when we catch any exception (#3090)
There are code paths in which we throw non-IOExceptions when
initializing a WAL reader. However, we only close the InputStream to the
WAL filesystem when the exception is an IOException. Close it if it is
open in all cases.
Co-authored-by: Josh Elser <je...@cloudera.com>
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../org/apache/hadoop/hbase/wal/WALFactory.java | 58 ++++-----
.../apache/hadoop/hbase/wal/FileSystemProxy.java | 105 +++++++++++++++++
.../apache/hadoop/hbase/wal/TestWALFactory.java | 129 +++++++++++++++++++++
3 files changed, 266 insertions(+), 26 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index b528662..b84f1be 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -329,7 +329,9 @@ public class WALFactory {
reader = lrClass.getDeclaredConstructor().newInstance();
reader.init(fs, path, conf, null);
return reader;
- } catch (IOException e) {
+ } catch (Exception e) {
+ // catch Exception so that we close reader for all exceptions. If we don't
+ // close the reader, we leak a socket.
if (reader != null) {
try {
reader.close();
@@ -339,34 +341,38 @@ public class WALFactory {
}
}
- String msg = e.getMessage();
- if (msg != null
- && (msg.contains("Cannot obtain block length")
- || msg.contains("Could not obtain the last block") || msg
- .matches("Blocklist for [^ ]* has changed.*"))) {
- if (++nbAttempt == 1) {
- LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
- }
- if (reporter != null && !reporter.progress()) {
- throw new InterruptedIOException("Operation is cancelled");
- }
- if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
- LOG.error("Can't open after " + nbAttempt + " attempts and "
- + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
- } else {
- try {
- Thread.sleep(nbAttempt < 3 ? 500 : 1000);
- continue; // retry
- } catch (InterruptedException ie) {
- InterruptedIOException iioe = new InterruptedIOException();
- iioe.initCause(ie);
- throw iioe;
+ // Only inspect the Exception to consider retry when it's an IOException
+ if (e instanceof IOException) {
+ String msg = e.getMessage();
+ if (msg != null
+ && (msg.contains("Cannot obtain block length")
+ || msg.contains("Could not obtain the last block") || msg
+ .matches("Blocklist for [^ ]* has changed.*"))) {
+ if (++nbAttempt == 1) {
+ LOG.warn("Lease should have recovered. This is not expected. Will retry", e);
+ }
+ if (reporter != null && !reporter.progress()) {
+ throw new InterruptedIOException("Operation is cancelled");
}
+ if (nbAttempt > 2 && openTimeout < EnvironmentEdgeManager.currentTime()) {
+ LOG.error("Can't open after " + nbAttempt + " attempts and "
+ + (EnvironmentEdgeManager.currentTime() - startWaiting) + "ms " + " for " + path);
+ } else {
+ try {
+ Thread.sleep(nbAttempt < 3 ? 500 : 1000);
+ continue; // retry
+ } catch (InterruptedException ie) {
+ InterruptedIOException iioe = new InterruptedIOException();
+ iioe.initCause(ie);
+ throw iioe;
+ }
+ }
+ throw new LeaseNotRecoveredException(e);
}
- throw new LeaseNotRecoveredException(e);
- } else {
- throw e;
}
+
+ // Rethrow the original exception if we are not retrying due to HDFS-isms.
+ throw e;
}
}
} catch (IOException ie) {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java
new file mode 100644
index 0000000..fb729f5
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/FileSystemProxy.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * Create a non-abstract "proxy" for FileSystem because FileSystem is an
+ * abstract class and not an interface. Only interfaces can be used with the
+ * Java Proxy class to override functionality via an InvocationHandler.
+ *
+ */
+public class FileSystemProxy extends FileSystem {
+ private final FileSystem real;
+
+ public FileSystemProxy(FileSystem real) {
+ this.real = real;
+ }
+
+ @Override
+ public FSDataInputStream open(Path p) throws IOException {
+ return real.open(p);
+ }
+
+ @Override
+ public URI getUri() {
+ return real.getUri();
+ }
+
+ @Override
+ public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+ return real.open(f, bufferSize);
+ }
+
+ @Override
+ public FSDataOutputStream create(Path f, FsPermission permission, boolean overwrite, int bufferSize,
+ short replication, long blockSize, Progressable progress) throws IOException {
+ return real.create(f, permission, overwrite, bufferSize, replication, blockSize, progress);
+ }
+
+ @Override
+ public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException {
+ return real.append(f, bufferSize, progress);
+ }
+
+ @Override
+ public boolean rename(Path src, Path dst) throws IOException {
+ return real.rename(src, dst);
+ }
+
+ @Override
+ public boolean delete(Path f, boolean recursive) throws IOException {
+ return real.delete(f, recursive);
+ }
+
+ @Override
+ public FileStatus[] listStatus(Path f) throws FileNotFoundException, IOException {
+ return real.listStatus(f);
+ }
+
+ @Override
+ public void setWorkingDirectory(Path new_dir) {
+ real.setWorkingDirectory(new_dir);
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ return real.getWorkingDirectory();
+ }
+
+ @Override
+ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+ return real.mkdirs(f, permission);
+ }
+
+ @Override
+ public FileStatus getFileStatus(Path f) throws IOException {
+ return real.getFileStatus(f);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 74ab840..f1ac464 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -26,11 +26,16 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.IOException;
+import java.io.InputStream;
import java.lang.reflect.Method;
import java.net.BindException;
+import java.util.ArrayList;
import java.util.List;
import java.util.NavigableMap;
import java.util.TreeMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
@@ -51,10 +56,13 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.codec.Codec;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.SampleRegionWALCoprocessor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
@@ -749,4 +757,125 @@ public class TestWALFactory {
WALProvider metaWALProvider = walFactory.getMetaProvider();
assertEquals(IOTestProvider.class, metaWALProvider.getClass());
}
+
+ @Test
+ public void testReaderClosedOnBadCodec() throws IOException {
+ // Create our own Configuration and WALFactory to avoid breaking other test methods
+ Configuration confWithCodec = new Configuration(conf);
+ confWithCodec.setClass(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, BrokenWALCellCodec.class, Codec.class);
+ WALFactory customFactory = new WALFactory(confWithCodec, this.currentServername.toString());
+
+ // Hack a Proxy over the FileSystem so that we can track the InputStreams opened by
+ // the FileSystem and know if close() was called on those InputStreams.
+ List<InputStreamProxy> openedReaders = new ArrayList<>();
+ FileSystemProxy proxyFs = new FileSystemProxy(fs) {
+ @Override
+ public FSDataInputStream open(Path p) throws IOException {
+ InputStreamProxy is = new InputStreamProxy(super.open(p));
+ openedReaders.add(is);
+ return is;
+ }
+
+ @Override
+ public FSDataInputStream open(Path p, int blockSize) throws IOException {
+ InputStreamProxy is = new InputStreamProxy(super.open(p, blockSize));
+ openedReaders.add(is);
+ return is;
+ }
+ };
+
+ final TableDescriptor htd =
+ TableDescriptorBuilder.newBuilder(TableName.valueOf(currentTest.getMethodName()))
+ .setColumnFamily(ColumnFamilyDescriptorBuilder.of("column")).build();
+ final RegionInfo hri = RegionInfoBuilder.newBuilder(htd.getTableName()).build();
+
+ NavigableMap<byte[], Integer> scopes = new TreeMap<byte[], Integer>(Bytes.BYTES_COMPARATOR);
+ for (byte[] fam : htd.getColumnFamilyNames()) {
+ scopes.put(fam, 0);
+ }
+ byte[] row = Bytes.toBytes("row");
+ WAL.Reader reader = null;
+ final MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl(1);
+ try {
+ // Write one column in one edit.
+ WALEdit cols = new WALEdit();
+ cols.add(new KeyValue(row, Bytes.toBytes("column"),
+ Bytes.toBytes("0"), System.currentTimeMillis(), new byte[] { 0 }));
+ final WAL log = customFactory.getWAL(hri);
+ final long txid = log.appendData(hri, new WALKeyImpl(hri.getEncodedNameAsBytes(),
+ htd.getTableName(), System.currentTimeMillis(), mvcc, scopes), cols);
+ // Sync the edit to the WAL
+ log.sync(txid);
+ log.startCacheFlush(hri.getEncodedNameAsBytes(), htd.getColumnFamilyNames());
+ log.completeCacheFlush(hri.getEncodedNameAsBytes(), HConstants.NO_SEQNUM);
+ log.shutdown();
+
+ // Inject our failure, object is constructed via reflection.
+ BrokenWALCellCodec.THROW_FAILURE_ON_INIT.set(true);
+
+ // Now open a reader on the log which will throw an exception when
+ // we try to instantiate the custom Codec.
+ Path filename = AbstractFSWALProvider.getCurrentFileName(log);
+ try {
+ reader = customFactory.createReader(proxyFs, filename);
+ fail("Expected to see an exception when creating WAL reader");
+ } catch (Exception e) {
+ // Expected that we get an exception
+ }
+ // We should have exactly one reader
+ assertEquals(1, openedReaders.size());
+ // And that reader should be closed.
+ long unclosedReaders = openedReaders.stream()
+ .filter((r) -> !r.isClosed.get())
+ .collect(Collectors.counting());
+ assertEquals("Should not find any open readers", 0, (int) unclosedReaders);
+ } finally {
+ if (reader != null) {
+ reader.close();
+ }
+ }
+ }
+
+ /**
+ * A proxy around FSDataInputStream which can report if close() was called.
+ */
+ private static class InputStreamProxy extends FSDataInputStream {
+ private final InputStream real;
+ private final AtomicBoolean isClosed = new AtomicBoolean(false);
+
+ public InputStreamProxy(InputStream real) {
+ super(real);
+ this.real = real;
+ }
+
+ @Override
+ public void close() throws IOException {
+ isClosed.set(true);
+ real.close();
+ }
+ }
+
+ /**
+ * A custom WALCellCodec in which we can inject failure.
+ */
+ @SuppressWarnings("unused")
+ private static class BrokenWALCellCodec extends WALCellCodec {
+ static final AtomicBoolean THROW_FAILURE_ON_INIT = new AtomicBoolean(false);
+
+ static void maybeInjectFailure() {
+ if (THROW_FAILURE_ON_INIT.get()) {
+ throw new RuntimeException("Injected instantiation exception");
+ }
+ }
+
+ public BrokenWALCellCodec() {
+ super();
+ maybeInjectFailure();
+ }
+
+ public BrokenWALCellCodec(Configuration conf, CompressionContext compression) {
+ super(conf, compression);
+ maybeInjectFailure();
+ }
+ }
}