You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by su...@apache.org on 2021/04/27 03:13:41 UTC

[hbase] branch HBASE-24666 updated: HBASE-24737 Find a way to resolve WALFileLengthProvider#getLogFileSizeIfBeingWritten problem (#3045)

This is an automated email from the ASF dual-hosted git repository.

sunxin pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/HBASE-24666 by this push:
     new 786c7d7  HBASE-24737 Find a way to resolve WALFileLengthProvider#getLogFileSizeIfBeingWritten problem (#3045)
786c7d7 is described below

commit 786c7d7dfc720600cd6e5ae5dd712db8e0b8fd4d
Author: XinSun <dd...@gmail.com>
AuthorDate: Tue Apr 27 11:13:15 2021 +0800

    HBASE-24737 Find a way to resolve WALFileLengthProvider#getLogFileSizeIfBeingWritten problem (#3045)
    
    Signed-off-by: Duo Zhang <zh...@apache.org>
---
 .../src/main/protobuf/server/region/Admin.proto    |  12 ++
 .../hbase/client/AsyncRegionServerAdmin.java       |   8 ++
 .../hadoop/hbase/regionserver/HRegionServer.java   |   2 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  24 ++++
 .../hbase/replication/HReplicationServer.java      |  11 +-
 .../regionserver/WALFileLengthProvider.java        |   3 +-
 .../RemoteWALFileLengthProvider.java               |  73 ++++++++++++
 .../org/apache/hadoop/hbase/wal/WALProvider.java   |  15 ++-
 .../hadoop/hbase/master/MockRegionServer.java      |   7 ++
 .../TestRemoteWALFileLengthProvider.java           | 130 +++++++++++++++++++++
 10 files changed, 280 insertions(+), 5 deletions(-)

diff --git a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
index 0667292..693a809 100644
--- a/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/server/region/Admin.proto
@@ -328,6 +328,15 @@ message ClearSlowLogResponses {
   required bool is_cleaned = 1;
 }
 
+message GetLogFileSizeIfBeingWrittenRequest {
+  required string wal_path = 1;
+}
+
+message GetLogFileSizeIfBeingWrittenResponse {
+  required bool is_being_written = 1;
+  optional uint64 length = 2;
+}
+
 service AdminService {
   rpc GetRegionInfo(GetRegionInfoRequest)
     returns(GetRegionInfoResponse);
@@ -399,4 +408,7 @@ service AdminService {
   rpc GetLogEntries(LogRequest)
     returns(LogEntry);
 
+  rpc GetLogFileSizeIfBeingWritten(GetLogFileSizeIfBeingWrittenRequest)
+    returns(GetLogFileSizeIfBeingWrittenResponse);
+
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index cb06137..edef98a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -42,6 +42,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
@@ -216,4 +218,10 @@ public class AsyncRegionServerAdmin {
       ExecuteProceduresRequest request) {
     return call((stub, controller, done) -> stub.executeProcedures(controller, request, done));
   }
+
+  public CompletableFuture<GetLogFileSizeIfBeingWrittenResponse> getLogFileSizeIfBeingWritten(
+    GetLogFileSizeIfBeingWrittenRequest request) {
+    return call((stub, controller, done) ->
+      stub.getLogFileSizeIfBeingWritten(controller, request, done));
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f2379dd..af335a5 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -2307,7 +2307,7 @@ public class HRegionServer extends Thread implements
     return walRoller;
   }
 
-  WALFactory getWalFactory() {
+  public WALFactory getWalFactory() {
     return walFactory;
   }
 
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index ff8c7fc..b571ce0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -36,6 +36,7 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.ConcurrentHashMap;
@@ -135,6 +136,7 @@ import org.apache.hadoop.hbase.regionserver.handler.UnassignRegionHandler;
 import org.apache.hadoop.hbase.replication.ReplicationUtils;
 import org.apache.hadoop.hbase.replication.regionserver.RejectReplicationRequestStateChecker;
 import org.apache.hadoop.hbase.replication.regionserver.RejectRequestsFromClientStateChecker;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
 import org.apache.hadoop.hbase.security.access.AccessChecker;
@@ -188,6 +190,8 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProc
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ExecuteProceduresResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionRequest;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetOnlineRegionResponse;
 import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetRegionInfoRequest;
@@ -4019,6 +4023,26 @@ public class RSRpcServices implements HBaseRPCErrorHandler,
     throw new ServiceException("Invalid request params");
   }
 
+  @Override
+  public GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten(
+    RpcController controller, GetLogFileSizeIfBeingWrittenRequest request) throws ServiceException {
+    GetLogFileSizeIfBeingWrittenResponse.Builder builder =
+      GetLogFileSizeIfBeingWrittenResponse.newBuilder();
+    try {
+      WALFileLengthProvider walLengthProvider =
+        this.regionServer.getWalFactory().getWALProvider().getWALFileLengthProvider();
+      OptionalLong lengthOptional =
+        walLengthProvider.getLogFileSizeIfBeingWritten(new Path(request.getWalPath()));
+      if (lengthOptional.isPresent()) {
+        return builder.setIsBeingWritten(true).setLength(lengthOptional.getAsLong()).build();
+      } else {
+        return builder.setIsBeingWritten(false).build();
+      }
+    } catch (Exception e) {
+      throw new ServiceException(e);
+    }
+  }
+
   public RpcScheduler getRpcScheduler() {
     return rpcServer.getScheduler();
   }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
index 4c7ce0a..5c3f8b7 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/HReplicationServer.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.hbase.replication.regionserver.RecoveredReplicationSour
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceFactory;
 import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.replication.replicationserver.RemoteWALFileLengthProvider;
 import org.apache.hadoop.hbase.security.SecurityConstants;
 import org.apache.hadoop.hbase.security.Superusers;
 import org.apache.hadoop.hbase.security.User;
@@ -718,7 +720,7 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou
     ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
     // init replication source
     src.init(conf, walFs, walDir, this, queueStorage, replicationPeers.getPeer(peerId), this,
-      producer, queueId, clusterId, p -> OptionalLong.empty(), metrics);
+      producer, queueId, clusterId, createWALFileLengthProvider(producer, queueId), metrics);
     queueStorage.getWALsInQueue(producer, queueId)
       .forEach(walName -> src.enqueueLog(new Path(walDir, walName)));
     src.startup();
@@ -746,4 +748,11 @@ public class HReplicationServer extends Thread implements Server, ReplicationSou
       abort("Failed to operate on replication queue", e);
     }
   }
+
+  private WALFileLengthProvider createWALFileLengthProvider(ServerName producer, String queueId) {
+    if (new ReplicationQueueInfo(queueId).isQueueRecovered()) {
+      return p -> OptionalLong.empty();
+    }
+    return new RemoteWALFileLengthProvider(asyncClusterConnection, producer);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
index c60faa9..f91dd2c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/WALFileLengthProvider.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import java.io.IOException;
 import java.util.OptionalLong;
 
 import org.apache.hadoop.fs.Path;
@@ -33,5 +34,5 @@ import org.apache.yetus.audience.InterfaceAudience;
 @FunctionalInterface
 public interface WALFileLengthProvider {
 
-  OptionalLong getLogFileSizeIfBeingWritten(Path path);
+  OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException;
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java
new file mode 100644
index 0000000..07d216d
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/replicationserver/RemoteWALFileLengthProvider.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import java.io.IOException;
+import java.util.OptionalLong;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.AsyncRegionServerAdmin;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.GetLogFileSizeIfBeingWrittenRequest;
+
+/**
+ * Used by ReplicationServer while Replication offload enabled.
+ * On ReplicationServer, we need to know the length of the wal being writing from RegionServer that
+ * holds the wal. So achieve that through RPC call.
+ */
+@InterfaceAudience.Private
+public class RemoteWALFileLengthProvider implements WALFileLengthProvider {
+
+  private static final Logger LOG = LoggerFactory.getLogger(RemoteWALFileLengthProvider.class);
+
+  private AsyncClusterConnection conn;
+
+  private ServerName rs;
+
+  public RemoteWALFileLengthProvider(AsyncClusterConnection conn, ServerName rs) {
+    this.conn = conn;
+    this.rs = rs;
+  }
+
+  @Override
+  public OptionalLong getLogFileSizeIfBeingWritten(Path path) throws IOException {
+    AsyncRegionServerAdmin rsAdmin = conn.getRegionServerAdmin(rs);
+    GetLogFileSizeIfBeingWrittenRequest request =
+      GetLogFileSizeIfBeingWrittenRequest.newBuilder().setWalPath(path.toString()).build();
+    try {
+      AdminProtos.GetLogFileSizeIfBeingWrittenResponse response =
+        FutureUtils.get(rsAdmin.getLogFileSizeIfBeingWritten(request));
+      if (response.getIsBeingWritten()) {
+        return OptionalLong.of(response.getLength());
+      } else {
+        return OptionalLong.empty();
+      }
+    } catch (IOException e) {
+      LOG.warn("Exceptionally get the length of wal {} from RS {}", path.getName(), rs);
+      throw e;
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
index 01c1d11..a9bd50e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALProvider.java
@@ -130,7 +130,18 @@ public interface WALProvider {
   void addWALActionsListener(WALActionsListener listener);
 
   default WALFileLengthProvider getWALFileLengthProvider() {
-    return path -> getWALs().stream().map(w -> w.getLogFileSizeIfBeingWritten(path))
-        .filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
+    return path -> getWALs().stream().map(w -> {
+      try {
+        return w.getLogFileSizeIfBeingWritten(path);
+      } catch (IOException e) {
+        // Won't go here. For supporting replication offload in HBASE-24737, we introduce
+        // RemoteWALFileLengthProvider implementing WALFileLengthProvider, it is hold by
+        // ReplicationServer and gets the length of WALs from RS through RPC, it may throw an IOE.
+        // So we need declare WALFileLengthProvider.getLogFileSizeIfBeingWritten as throwing IOE.
+        // But this is safe here, WALProvider is only used by RS, getWALs returns WAL that extents
+        // WALFileLengthProvider and won't throw IOE.
+        return OptionalLong.empty();
+      }
+    }).filter(o -> o.isPresent()).findAny().orElse(OptionalLong.empty());
   }
 }
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
index 69a7a79..084b5af 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java
@@ -690,6 +690,13 @@ class MockRegionServer implements AdminProtos.AdminService.BlockingInterface,
   }
 
   @Override
+  public AdminProtos.GetLogFileSizeIfBeingWrittenResponse getLogFileSizeIfBeingWritten(
+    RpcController controller, AdminProtos.GetLogFileSizeIfBeingWrittenRequest request)
+    throws ServiceException {
+    return null;
+  }
+
+  @Override
   public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(
       RpcController controller, GetSpaceQuotaSnapshotsRequest request)
       throws ServiceException {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java
new file mode 100644
index 0000000..a9adbec
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/replicationserver/TestRemoteWALFileLengthProvider.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.replicationserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.OptionalLong;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ MediumTests.class})
+public class TestRemoteWALFileLengthProvider {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRemoteWALFileLengthProvider.class);
+
+  private static final Logger LOG = LoggerFactory.getLogger(TestRemoteWALFileLengthProvider.class);
+
+  @Rule
+  public final TestName name = new TestName();
+
+  private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
+
+  private static final byte[] CF = Bytes.toBytes("C");
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+    UTIL.startMiniCluster();
+  }
+
+  @AfterClass
+  public static void teardownAfterClass() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void test() throws Exception {
+    TableName tableName = TableName.valueOf(name.getMethodName());
+    Table table = UTIL.createTable(tableName, CF);
+    UTIL.waitUntilAllRegionsAssigned(tableName);
+    assertEquals(1, UTIL.getMiniHBaseCluster().getNumLiveRegionServers());
+
+    // Find the RS which holds test table regions.
+    HRegionServer rs =
+      UTIL.getMiniHBaseCluster().getLiveRegionServerThreads().stream()
+        .map(JVMClusterUtil.RegionServerThread::getRegionServer)
+        .filter(s -> !s.getRegions(tableName).isEmpty())
+        .findFirst().get();
+    assertNotNull(rs);
+
+    // Put some data and request rolling log, make multiple wals.
+    table.put(new Put(Bytes.toBytes("r1")).addColumn(CF, CF, Bytes.toBytes("v")));
+    rs.getWalRoller().requestRollAll();
+    table.put(new Put(Bytes.toBytes("r2")).addColumn(CF, CF, Bytes.toBytes("v")));
+    UTIL.waitFor(60000, rs::walRollRequestFinished);
+
+    WALFileLengthProvider rsLengthProvider =
+      rs.getWalFactory().getWALProvider().getWALFileLengthProvider();
+    WALFileLengthProvider remoteLengthProvider =
+      new RemoteWALFileLengthProvider(UTIL.getAsyncConnection(), rs.getServerName());
+
+    // Check that RegionServer and ReplicationServer can get same result whether the wal is being
+    // written
+    boolean foundWalIsBeingWritten = false;
+    List<Path> wals = getRsWalsOnFs(rs);
+    assertTrue(wals.size() > 1);
+    for (Path wal : wals) {
+      Path path = new Path(rs.getWALRootDir(), wal);
+      OptionalLong rsWalLength = rsLengthProvider.getLogFileSizeIfBeingWritten(path);
+      OptionalLong remoteLength = remoteLengthProvider.getLogFileSizeIfBeingWritten(path);
+      assertEquals(rsWalLength.isPresent(), remoteLength.isPresent());
+      if (rsWalLength.isPresent() && remoteLength.isPresent()) {
+        foundWalIsBeingWritten = true;
+        assertEquals(rsWalLength.getAsLong(), remoteLength.getAsLong());
+      }
+    }
+    assertTrue(foundWalIsBeingWritten);
+  }
+
+  private List<Path> getRsWalsOnFs(HRegionServer rs) throws IOException {
+    FileSystem fs = rs.getFileSystem();
+    FileStatus[] fileStatuses = fs.listStatus(new Path(rs.getWALRootDir(),
+      AbstractFSWALProvider.getWALDirectoryName(rs.getServerName().toString())));
+    return Arrays.stream(fileStatuses).map(FileStatus::getPath).collect(Collectors.toList());
+  }
+}