You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/07/09 06:56:05 UTC
[hbase] 11/12: HBASE-24737 Find a way to resolve
WALFileLengthProvider#getLogFileSizeIfBeingWritten problem (#3045)
This is an automated email from the ASF dual-hosted git repository.
sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit ff16870505d082fe5075cb31e60d6ec045cf2ab6
Author: XinSun <dd...@gmail.com>
AuthorDate: Tue Apr 27 11:13:15 2021 +0800
HBASE-24737 Find a way to resolve WALFileLengthProvider#getLogFileSizeIfBeingWritten problem (#3045)
Signed-off-by: Duo Zhang <zh...@apache.org>
---
.../src/main/protobuf/server/region/Admin.proto | 12 ++
.../hbase/client/AsyncRegionServerAdmin.java | 8 ++
.../hadoop/hbase/regionserver/HRegionServer.java | 2 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 24 ++++
.../hbase/replication/HReplicationServer.java | 11 +-
.../regionserver/WALFileLengthProvider.java | 3 +-
.../RemoteWALFileLengthProvider.java | 73 ++++++++++++
.../org/apache/hadoop/hbase/wal/WALProvider.java | 15 ++-
.../hadoop/hbase/master/MockRegionServer.java | 7 ++
.../TestRemoteWALFileLengthProvider.java | 130 +++++++++++++++++++++
10 files changed, 280 insertions(+), 5 deletions(-)
diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 0667292..693a809 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -328,6 +328,15 @@ message ClearSlowLogResponses {
required bool is_cleaned = 1;
}
+message GetLogFileSizeIfBeingWrittenRequest {
+ required string wal_path = 1;
+}
+
+message GetLogFileSizeIfBeingWrittenResponse {
+ required bool is_being_written = 1;
+ optional uint64 length = 2;
+}
+
service AdminService {
rpc GetRegionInfo(GetRegionInfoRequest)
returns(GetRegionInfoResponse);
@@ -399,4 +408,7 @@ service AdminService {
rpc GetLogEntries(LogRequest)
returns(LogEntry);
+ rpc GetLogFileSizeIfBeingWritten(GetLogFileSizeIfBeingWrittenRequest)
+ returns(GetLogFileSizeIfBeingWrittenResponse);
+
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index 8ff869f..f18d894 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
@@ -216,4 +218,10 @@ public class AsyncRegionServerAdmin {
ExecuteProceduresRequest request) {
return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
}
+
+ public CompletableFuture<GetLogFileSizeIfBeingWrittenResponse> getLogFileSizeIfBeingWritten(
+ GetLogFileSizeIfBeingWrittenRequest request) {
+ return call((stub, controller, done) ->
+ stub.getLogFileSizeIfBeingWritten(controller, request, done));
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index c00a8b7..a5eb4e7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2323,7 +2323,7 @@ public class HRegionServer extends Thread implements
return walRoller;
}
- WALFactory getWalFactory() {
+ public WALFactory getWalFactory() {
return walFactory;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 91bf9cb..edc33d7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -37,6 +37,7 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
@@ -136,6 +137,7 @@ import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
@@ -189,6 +191,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
@@ -4055,6 +4059,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
throw new ServiceException("Invalid request params");
}
+ @Override
+ public GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten(
+ RpcController controller, GetLogFileSizeIfBeingWrittenRequest request) throws ServiceException {
+ GetLogFileSizeIfBeingWrittenResponse.Builder builder =
+ GetLogFileSizeIfBeingWrittenResponse.newBuilder();
+ try {
+ WALFileLengthProvider walLengthProvider =
+ this.regionServer.getWalFactory().getWALProvider().getWALFileLengthProvider();
+ OptionalLong lengthOptional =
+ walLengthProvider.getLogFileSizeIfBeingWritten(new Path(request.getWalPath()));
+ if (lengthOptional.isPresent()) {
+ return builder.setIsBeingWritten(true).setLength(lengthOptional.getAsLong()).build();
+ } else {
+ return builder.setIsBeingWritten(false).build();
+ }
+ } catch (Exception e) {
+ throw new ServiceException(e);
+ }
+ }
+
public RpcScheduler getRpcScheduler() {
return rpcServer.getScheduler();
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 8d85b85..2654565 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSour
import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.replication.replicationserver.RemoteWALFileLengthProvider;
import org.apache.hadoop.hbase.security.SecurityConstants;
import org.apache.hadoop.hbase.security.Superusers;
import org.apache.hadoop.hbase.security.User;
@@ -716,7 +718,7 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
// init replication source
src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this,
- producer, queueId, clusterId, p -> OptionalLong.empty(), metrics);
+ producer, queueId, clusterId, createWALFileLengthProvider(producer, queueId), metrics);
queueStorage.getWALsInQueue(producer, queueId)
.forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
src.startup();
@@ -744,4 +746,11 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou
abort("Failed to operate on replication queue", e);
}
}
+
+ private WALFileLengthProvider createWALFileLengthProvider(ServerName producer, String queueId) {
+ if (new ReplicationQueueInfo(queueId).isQueueRecovered()) {
+ return p -> OptionalLong.empty();
+ }
+ return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index c60faa9..f91dd2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.replication.regionserver;
+import java.io.IOException;
import java.util.OptionalLong;
import org.apache.hadoop.fs.Path;
@@ -33,5 +34,5 @@ import org.apache.yetus.audience.InterfaceAudience;
@FunctionalInterface
public interface WALFileLengthProvider {
- OptionalLong getLogFileSizeIfBeingWritten(Path path);
+ OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java
new file mode 100644
index 0000000..07d216d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import java.io.IOException;
+import java.util.OptionalLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
+
+/**
+ * Used by ReplicationServer while Replication offload enabled.
+ * On ReplicationServer, we need to know the length of the wal being writing from RegionServer that
+ * holds the wal. So achieve that through RPC call.
+ */
+@InterfaceAudience.Private
+public class RemoteWALFileLengthProvider implements WALFileLengthProvider {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RemoteWALFileLengthProvider.class);
+
+ private AsyncClusterConnection conn;
+
+ private ServerName rs;
+
+ public RemoteWALFileLengthProvider(AsyncClusterConnection conn, ServerName rs) {
+ this.conn = conn;
+ this.rs = rs;
+ }
+
+ @Override
+ public OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException {
+ AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(rs);
+ GetLogFileSizeIfBeingWrittenRequest request =
+ GetLogFileSizeIfBeingWrittenRequest.newBuilder().setWalPath(path.toString()).build();
+ try {
+ AdminProtos.GetLogFileSizeIfBeingWrittenResponse response =
+ FutureUtils.get(rsAdmin.getLogFileSizeIfBeingWritten(request));
+ if (response.getIsBeingWritten()) {
+ return OptionalLong.of(response.getLength());
+ } else {
+ return OptionalLong.empty();
+ }
+ } catch (IOException e) {
+ LOG.warn("Exceptionally get the length of wal {} from RS {}", path.getName(), rs);
+ throw e;
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 01c1d11..a9bd50e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -130,7 +130,18 @@ public interface WALProvider {
void addWALActionsListener(WALActionsListener listener);
default WALFileLengthProvider getWALFileLengthProvider() {
- return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path))
- .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
+ return path -> getWALs().stream().map(w -> {
+ try {
+ return w.getLogFileSizeIfBeingWritten(path);
+ } catch (IOException e) {
+ // Won't go here. For supporting replication offload in HBASE-24737, we introduce
+ // RemoteWALFileLengthProvider implementing WALFileLengthProvider, it is hold by
+ // ReplicationServer and gets the length of WALs from RS through RPC, it may throw an IOE.
+ // So we need declare WALFileLengthProvider.getLogFileSizeIfBeingWritten as throwing IOE.
+ // But this is safe here, WALProvider is only used by RS, getWALs returns WAL that extents
+ // WALFileLengthProvider and won't throw IOE.
+ return OptionalLong.empty();
+ }
+ }).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 69a7a79..084b5af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -690,6 +690,13 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
}
@Override
+ public AdminProtos.GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten(
+ RpcController controller, AdminProtos.GetLogFileSizeIfBeingWrittenRequest request)
+ throws ServiceException {
+ return null;
+ }
+
+ @Override
public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
RpcController controller, GetSpaceQuotaSnapshotsRequest request)
throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java
new file mode 100644
index 0000000..a9adbec
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MediumTests.class})
+public class TestRemoteWALFileLengthProvider {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRemoteWALFileLengthProvider.class);
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestRemoteWALFileLengthProvider.class);
+
+ @Rule
+ public final TestName name = new TestName();
+
+ private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+ private static final byte[] CF = Bytes.toBytes("C");
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ UTIL.startMiniCluster();
+ }
+
+ @AfterClass
+ public static void teardownAfterClass() throws Exception {
+ UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void test() throws Exception {
+ TableName tableName = TableName.valueOf(name.getMethodName());
+ Table table = UTIL.createTable(tableName, CF);
+ UTIL.waitUntilAllRegionsAssigned(tableName);
+ assertEquals(1, UTIL.getMiniHBaseCluster().getNumLiveRegionServers());
+
+ // Find the RS which holds test table regions.
+ HRegionServer rs =
+ UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
+ .map(JVMClusterUtil.RegionServerThread::getRegionServer)
+ .filter(s -> !s.getRegions(tableName).isEmpty())
+ .findFirst().get();
+ assertNotNull(rs);
+
+ // Put some data and request rolling log, make multiple wals.
+ table.put(new Put(Bytes.toBytes("r1")).addColumn(CF, CF, Bytes.toBytes("v")));
+ rs.getWalRoller().requestRollAll();
+ table.put(new Put(Bytes.toBytes("r2")).addColumn(CF, CF, Bytes.toBytes("v")));
+ UTIL.waitFor(60000, rs::walRollRequestFinished);
+
+ WALFileLengthProvider rsLengthProvider =
+ rs.getWalFactory().getWALProvider().getWALFileLengthProvider();
+ WALFileLengthProvider remoteLengthProvider =
+ new RemoteWALFileLengthProvider(UTIL.getAsyncConnection(), rs.getServerName());
+
+ // Check that RegionServer and ReplicationServer can get same result whether the wal is being
+ // written
+ boolean foundWalIsBeingWritten = false;
+ List<Path> wals = getRsWalsOnFs(rs);
+ assertTrue(wals.size() > 1);
+ for (Path wal : wals) {
+ Path path = new Path(rs.getWALRootDir(), wal);
+ OptionalLong rsWalLength = rsLengthProvider.getLogFileSizeIfBeingWritten(path);
+ OptionalLong remoteLength = remoteLengthProvider.getLogFileSizeIfBeingWritten(path);
+ assertEquals(rsWalLength.isPresent(), remoteLength.isPresent());
+ if (rsWalLength.isPresent() && remoteLength.isPresent()) {
+ foundWalIsBeingWritten = true;
+ assertEquals(rsWalLength.getAsLong(), remoteLength.getAsLong());
+ }
+ }
+ assertTrue(foundWalIsBeingWritten);
+ }
+
+ private List<Path> getRsWalsOnFs(HRegionServer rs) throws IOException {
+ FileSystem fs = rs.getFileSystem();
+ FileStatus[] fileStatuses = fs.listStatus(new Path(rs.getWALRootDir(),
+ AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().toString())));
+ return Arrays.stream(fileStatuses).map(FileStatus::getPath).collect(Collectors.toList());
+ }
+}