You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/05/31 07:02:34 UTC
[36/36] hbase git commit: HBASE-20637 Polish the WAL switching when
transiting from A to S
HBASE-20637 Polish the WAL switching when transiting from A to S
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/1f3e50fc
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/1f3e50fc
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/1f3e50fc
Branch: refs/heads/HBASE-19064
Commit: 1f3e50fcefd049290e5b102d04b568e40e6dc91e
Parents: 46866d9
Author: zhangduo <zh...@apache.org>
Authored: Tue May 29 20:38:20 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu May 31 14:41:40 2018 +0800
----------------------------------------------------------------------
.../hbase/regionserver/wal/AsyncFSWAL.java | 52 +++++++++++++-
.../hbase/regionserver/wal/DualAsyncFSWAL.java | 71 ++++++++++++++------
.../apache/hadoop/hbase/util/FSHDFSUtils.java | 16 +++--
.../hbase/wal/SyncReplicationWALProvider.java | 2 +-
.../replication/DualAsyncFSWALForTest.java | 4 +-
.../replication/SyncReplicationTestBase.java | 26 +++++--
.../replication/TestSyncReplicationActive.java | 42 ++++++++++--
7 files changed, 176 insertions(+), 37 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
index 9b4ce9c..7f3e30b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AsyncFSWAL.java
@@ -52,12 +52,12 @@ import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
-import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.htrace.core.TraceScope;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoop;
@@ -470,6 +470,44 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
// whether to issue a sync in the caller method.
}
+ private void drainNonMarkerEditsAndFailSyncs() {
+ if (toWriteAppends.isEmpty()) {
+ return;
+ }
+ boolean hasNonMarkerEdits = false;
+ Iterator<FSWALEntry> iter = toWriteAppends.descendingIterator();
+ while (iter.hasNext()) {
+ FSWALEntry entry = iter.next();
+ if (!entry.getEdit().isMetaEdit()) {
+ hasNonMarkerEdits = true;
+ break;
+ }
+ }
+ if (hasNonMarkerEdits) {
+ for (;;) {
+ iter.remove();
+ if (!iter.hasNext()) {
+ break;
+ }
+ iter.next();
+ }
+ unackedAppends.clear();
+ // fail the sync futures which are under the txid of the first remaining edit, if none, fail
+ // all the sync futures.
+ long txid = toWriteAppends.isEmpty() ? Long.MAX_VALUE : toWriteAppends.peek().getTxid();
+ IOException error = new IOException("WAL is closing, only marker edit is allowed");
+ for (Iterator<SyncFuture> syncIter = syncFutures.iterator(); syncIter.hasNext();) {
+ SyncFuture future = syncIter.next();
+ if (future.getTxid() < txid) {
+ future.done(future.getTxid(), error);
+ syncIter.remove();
+ } else {
+ break;
+ }
+ }
+ }
+ }
+
private void consume() {
consumeLock.lock();
try {
@@ -512,6 +550,9 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
}
waitingConsumePayloadsGatingSequence.set(nextCursor);
}
+ if (markerEditOnly()) {
+ drainNonMarkerEditsAndFailSyncs();
+ }
appendAndSync();
if (hasConsumerTask.get()) {
return;
@@ -553,9 +594,18 @@ public class AsyncFSWAL extends AbstractFSWAL<AsyncWriter> {
return consumerScheduled.compareAndSet(false, true);
}
+ // This is used by sync replication, where we are going to close the wal soon after we reopen all
+ // the regions. Will be overridden by sub classes.
+ protected boolean markerEditOnly() {
+ return false;
+ }
+
@Override
public long append(RegionInfo hri, WALKeyImpl key, WALEdit edits, boolean inMemstore)
throws IOException {
+ if (markerEditOnly() && !edits.isMetaEdit()) {
+ throw new IOException("WAL is closing, only marker edit is allowed");
+ }
long txid =
stampSequenceIdAndPublishToRingBuffer(hri, key, edits, inMemstore, waitingConsumePayloads);
if (shouldScheduleConsumer()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
index 3967e78..bf5b96d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/DualAsyncFSWAL.java
@@ -18,14 +18,19 @@
package org.apache.hadoop.hbase.regionserver.wal;
import java.io.IOException;
+import java.io.InterruptedIOException;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.ConnectionUtils;
import org.apache.hadoop.hbase.wal.WALProvider.AsyncWriter;
import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
import org.apache.hbase.thirdparty.io.netty.channel.Channel;
import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@@ -35,20 +40,24 @@ import org.apache.hbase.thirdparty.io.netty.channel.EventLoopGroup;
@InterfaceAudience.Private
public class DualAsyncFSWAL extends AsyncFSWAL {
+ private static final Logger LOG = LoggerFactory.getLogger(DualAsyncFSWAL.class);
+
private final FileSystem remoteFs;
- private final Path remoteWalDir;
+ private final Path remoteWALDir;
+
+ private volatile boolean skipRemoteWAL = false;
- private volatile boolean skipRemoteWal = false;
+ private volatile boolean markerEditOnly = false;
- public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
+ public DualAsyncFSWAL(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
super(fs, rootDir, logDir, archiveDir, conf, listeners, failIfWALExists, prefix, suffix,
- eventLoopGroup, channelClass);
+ eventLoopGroup, channelClass);
this.remoteFs = remoteFs;
- this.remoteWalDir = remoteWalDir;
+ this.remoteWALDir = remoteWALDir;
}
// will be overridden in testcase
@@ -61,20 +70,37 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
@Override
protected AsyncWriter createWriterInstance(Path path) throws IOException {
AsyncWriter localWriter = super.createWriterInstance(path);
- if (skipRemoteWal) {
- return localWriter;
- }
- AsyncWriter remoteWriter;
- boolean succ = false;
- try {
- remoteWriter = createAsyncWriter(remoteFs, new Path(remoteWalDir, path.getName()));
- succ = true;
- } finally {
- if (!succ) {
- closeWriter(localWriter);
+ // retry forever if we can not create the remote writer to prevent aborting the RS due to log
+ // rolling error, unless the skipRemoteWal is set to true.
+ // TODO: since for now we only have one thread doing log rolling, this may block the rolling for
+ // other wals
+ Path remoteWAL = new Path(remoteWALDir, path.getName());
+ for (int retry = 0;; retry++) {
+ if (skipRemoteWAL) {
+ return localWriter;
+ }
+ AsyncWriter remoteWriter;
+ try {
+ remoteWriter = createAsyncWriter(remoteFs, remoteWAL);
+ } catch (IOException e) {
+ LOG.warn("create remote writer {} failed, retry = {}", remoteWAL, retry, e);
+ try {
+ Thread.sleep(ConnectionUtils.getPauseTime(100, retry));
+ } catch (InterruptedException ie) {
+ // restore the interrupt state
+ Thread.currentThread().interrupt();
+ Closeables.close(localWriter, true);
+ throw (IOException) new InterruptedIOException().initCause(ie);
+ }
+ continue;
}
+ return createCombinedAsyncWriter(localWriter, remoteWriter);
}
- return createCombinedAsyncWriter(localWriter, remoteWriter);
+ }
+
+ @Override
+ protected boolean markerEditOnly() {
+ return markerEditOnly;
}
// Allow temporarily skipping the creation of remote writer. When failing to write to the remote
@@ -82,7 +108,14 @@ public class DualAsyncFSWAL extends AsyncFSWAL {
// need to write a close marker when closing a region, and if it fails, the whole rs will abort.
// So here we need to skip the creation of remote writer and make it possible to write the region
// close marker.
- public void skipRemoteWal() {
- this.skipRemoteWal = true;
+ // Setting markerEdit only to true is for transiting from A to S, where we need to give up writing
+ // any pending wal entries as they will be discarded. The remote cluster will replicated the
+ // correct data back later. We still need to allow writing marker edits such as close region event
+ // to allow closing a region.
+ public void skipRemoteWAL(boolean markerEditOnly) {
+ if (markerEditOnly) {
+ this.markerEditOnly = true;
+ }
+ this.skipRemoteWAL = true;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
index 301d158..a49ee02 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/FSHDFSUtils.java
@@ -28,9 +28,9 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FilterFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException;
@@ -155,12 +155,16 @@ public class FSHDFSUtils extends FSUtils {
* Recover the lease from HDFS, retrying multiple times.
*/
@Override
- public void recoverFileLease(final FileSystem fs, final Path p,
- Configuration conf, CancelableProgressable reporter)
- throws IOException {
+ public void recoverFileLease(FileSystem fs, Path p, Configuration conf,
+ CancelableProgressable reporter) throws IOException {
+ if (fs instanceof FilterFileSystem) {
+ fs = ((FilterFileSystem) fs).getRawFileSystem();
+ }
// lease recovery not needed for local file system case.
- if (!(fs instanceof DistributedFileSystem)) return;
- recoverDFSFileLease((DistributedFileSystem)fs, p, conf, reporter);
+ if (!(fs instanceof DistributedFileSystem)) {
+ return;
+ }
+ recoverDFSFileLease((DistributedFileSystem) fs, p, conf, reporter);
}
/*
http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
index 82f8a89..b9fffcf 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/SyncReplicationWALProvider.java
@@ -291,7 +291,7 @@ public class SyncReplicationWALProvider implements WALProvider, PeerActionListen
try {
Optional<DualAsyncFSWAL> opt = peerId2WAL.get(peerId);
if (opt != null) {
- opt.ifPresent(DualAsyncFSWAL::skipRemoteWal);
+ opt.ifPresent(w -> w.skipRemoteWAL(to == SyncReplicationState.STANDBY));
} else {
// add a place holder to tell the getWAL caller do not use DualAsyncFSWAL any more.
peerId2WAL.put(peerId, Optional.empty());
http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
index fb3daf2..62000b4 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/DualAsyncFSWALForTest.java
@@ -97,11 +97,11 @@ class DualAsyncFSWALForTest extends DualAsyncFSWAL {
}
}
- public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWalDir,
+ public DualAsyncFSWALForTest(FileSystem fs, FileSystem remoteFs, Path rootDir, Path remoteWALDir,
String logDir, String archiveDir, Configuration conf, List<WALActionsListener> listeners,
boolean failIfWALExists, String prefix, String suffix, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) throws FailedLogCloseException, IOException {
- super(fs, remoteFs, rootDir, remoteWalDir, logDir, archiveDir, conf, listeners, failIfWALExists,
+ super(fs, remoteFs, rootDir, remoteWALDir, logDir, archiveDir, conf, listeners, failIfWALExists,
prefix, suffix, eventLoopGroup, channelClass);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
index 095be90..a20edd3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/SyncReplicationTestBase.java
@@ -23,6 +23,7 @@ import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.regex.Pattern;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.HBaseZKTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter.ExplainingPredicate;
+import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.Get;
@@ -127,10 +129,26 @@ public class SyncReplicationTestBase {
.setRemoteWALDir(REMOTE_WAL_DIR1.toUri().toString()).build());
}
+ private static void shutdown(HBaseTestingUtility util) throws Exception {
+ if (util.getHBaseCluster() == null) {
+ return;
+ }
+ Admin admin = util.getAdmin();
+ if (!admin.listReplicationPeers(Pattern.compile(PEER_ID)).isEmpty()) {
+ if (admin
+ .getReplicationPeerSyncReplicationState(PEER_ID) != SyncReplicationState.DOWNGRADE_ACTIVE) {
+ admin.transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.DOWNGRADE_ACTIVE);
+ }
+ admin.removeReplicationPeer(PEER_ID);
+ }
+ util.shutdownMiniCluster();
+ }
+
@AfterClass
public static void tearDown() throws Exception {
- UTIL1.shutdownMiniCluster();
- UTIL2.shutdownMiniCluster();
+ shutdown(UTIL1);
+ shutdown(UTIL2);
ZK_UTIL.shutdownMiniZKCluster();
}
@@ -207,7 +225,7 @@ public class SyncReplicationTestBase {
protected void verifyRemovedPeer(String peerId, Path remoteWALDir, HBaseTestingUtility utility)
throws Exception {
ReplicationPeerStorage rps = ReplicationStorageFactory
- .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
+ .getReplicationPeerStorage(utility.getZooKeeperWatcher(), utility.getConfiguration());
try {
rps.getPeerSyncReplicationState(peerId);
fail("Should throw exception when get the sync replication state of a removed peer.");
@@ -233,7 +251,7 @@ public class SyncReplicationTestBase {
Entry[] entries = new Entry[10];
for (int i = 0; i < entries.length; i++) {
entries[i] =
- new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
+ new Entry(new WALKeyImpl(HConstants.EMPTY_BYTE_ARRAY, TABLE_NAME, 0), new WALEdit());
}
if (!expectedRejection) {
ReplicationProtbufUtil.replicateWALEntry(connection.getAdmin(regionServer.getServerName()),
http://git-wip-us.apache.org/repos/asf/hbase/blob/1f3e50fc/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
index fce0cdf..42adab6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSyncReplicationActive.java
@@ -17,13 +17,28 @@
*/
package org.apache.hadoop.hbase.replication;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.client.AsyncConnection;
+import org.apache.hadoop.hbase.client.AsyncTable;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WAL.Reader;
import org.apache.hadoop.hbase.wal.WALFactory;
@@ -66,8 +81,27 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
// confirm that the data is there after we convert the peer to DA
verify(UTIL2, 0, 100);
- UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
- SyncReplicationState.STANDBY);
+ try (AsyncConnection conn =
+ ConnectionFactory.createAsyncConnection(UTIL1.getConfiguration()).get()) {
+ AsyncTable<?> table = conn.getTableBuilder(TABLE_NAME).setMaxAttempts(1).build();
+ CompletableFuture<Void> future =
+ table.put(new Put(Bytes.toBytes(1000)).addColumn(CF, CQ, Bytes.toBytes(1000)));
+ Thread.sleep(2000);
+ // should hang on rolling
+ assertFalse(future.isDone());
+ UTIL1.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
+ SyncReplicationState.STANDBY);
+ try {
+ future.get();
+ fail("should fail because of the wal is closing");
+ } catch (ExecutionException e) {
+ // expected
+ assertThat(e.getCause().getMessage(), containsString("only marker edit is allowed"));
+ }
+ }
+ // confirm that the data has not been persisted
+ HRegion region = UTIL1.getMiniHBaseCluster().getRegions(TABLE_NAME).get(0);
+ assertTrue(region.get(new Get(Bytes.toBytes(1000))).isEmpty());
UTIL2.getAdmin().transitReplicationPeerSyncReplicationState(PEER_ID,
SyncReplicationState.ACTIVE);
@@ -89,8 +123,8 @@ public class TestSyncReplicationActive extends SyncReplicationTestBase {
FileStatus[] files = fs2.listStatus(new Path(remoteDir, peerId));
Assert.assertTrue(files.length > 0);
for (FileStatus file : files) {
- try (Reader reader =
- WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
+ try (
+ Reader reader = WALFactory.createReader(fs2, file.getPath(), utility.getConfiguration())) {
Entry entry = reader.next();
Assert.assertTrue(entry != null);
while (entry != null) {