You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/05/15 14:54:13 UTC

[hbase] 13/16: HBASE-27775 Use a separate WAL provider for hbase:replication table (#5157)

This is an automated email from the ASF dual-hosted git repository.

zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git

commit 8fe691c705af23ee8ae0a09e5f7c3a8559b4df98
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Apr 8 10:50:42 2023 +0800

    HBASE-27775 Use a separate WAL provider for hbase:replication table (#5157)
    
    Signed-off-by: Liangjun He <he...@apache.org>
---
 .../replication/ReplicationStorageFactory.java     |   6 +
 .../MetricsRegionServerWrapperImpl.java            |  12 +-
 .../hbase/wal/LazyInitializedWALProvider.java      | 108 +++++++++++++
 .../org/apache/hadoop/hbase/wal/WALFactory.java    | 172 +++++++++++++--------
 .../replication/TestMultiSlaveReplication.java     |   6 +-
 .../apache/hadoop/hbase/wal/TestWALFactory.java    |  28 ++++
 6 files changed, 260 insertions(+), 72 deletions(-)

diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index 0b0eb0fc43f..ada127ee783 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -136,4 +136,10 @@ public final class ReplicationStorageFactory {
       return ReflectionUtils.newInstance(clazz, conf, tableName);
     }
   }
+
+  public static boolean isReplicationQueueTable(Configuration conf, TableName tableName) {
+    TableName replicationQueueTableName = TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
+      REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
+    return replicationQueueTableName.equals(tableName);
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index dd8c9c55127..6c7fc504b5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -982,12 +982,12 @@ class MetricsRegionServerWrapperImpl implements MetricsRegionServerWrapper {
 
         lastRan = currentTime;
 
-        final WALProvider provider = regionServer.getWalFactory().getWALProvider();
-        final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider();
-        numWALFiles = (provider == null ? 0 : provider.getNumLogFiles())
-          + (metaProvider == null ? 0 : metaProvider.getNumLogFiles());
-        walFileSize = (provider == null ? 0 : provider.getLogFileSize())
-          + (metaProvider == null ? 0 : metaProvider.getLogFileSize());
+        List<WALProvider> providers = regionServer.getWalFactory().getAllWALProviders();
+        for (WALProvider provider : providers) {
+          numWALFiles += provider.getNumLogFiles();
+          walFileSize += provider.getLogFileSize();
+        }
+
         // Copy over computed values so that no thread sees half computed values.
         numStores = tempNumStores;
         numStoreFiles = tempNumStoreFiles;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java
new file mode 100644
index 00000000000..2a95b182130
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WALFactory.Providers;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A lazy initialized WAL provider for holding the WALProvider for some special tables, such as
+ * hbase:meta, hbase:replication, etc.
+ */
+@InterfaceAudience.Private
+class LazyInitializedWALProvider implements Closeable {
+
+  private final WALFactory factory;
+
+  private final String providerId;
+
+  private final String providerConfigName;
+
+  private final Abortable abortable;
+
+  private final AtomicReference<WALProvider> holder = new AtomicReference<>();
+
+  LazyInitializedWALProvider(WALFactory factory, String providerId, String providerConfigName,
+    Abortable abortable) {
+    this.factory = factory;
+    this.providerId = providerId;
+    this.providerConfigName = providerConfigName;
+    this.abortable = abortable;
+  }
+
+  WALProvider getProvider() throws IOException {
+    Configuration conf = factory.getConf();
+    for (;;) {
+      WALProvider provider = this.holder.get();
+      if (provider != null) {
+        return provider;
+      }
+      Class<? extends WALProvider> clz = null;
+      if (conf.get(providerConfigName) == null) {
+        try {
+          clz = conf.getClass(WALFactory.WAL_PROVIDER, Providers.defaultProvider.clazz,
+            WALProvider.class);
+        } catch (Throwable t) {
+          // the WAL provider should be an enum. Proceed
+        }
+      }
+      if (clz == null) {
+        clz = factory.getProviderClass(providerConfigName,
+          conf.get(WALFactory.WAL_PROVIDER, WALFactory.DEFAULT_WAL_PROVIDER));
+      }
+      provider = WALFactory.createProvider(clz);
+      provider.init(factory, conf, providerId, this.abortable);
+      provider.addWALActionsListener(new MetricsWAL());
+      if (this.holder.compareAndSet(null, provider)) {
+        return provider;
+      } else {
+        // someone is ahead of us, close and try again.
+        provider.close();
+      }
+    }
+  }
+
+  /**
+   * Get the provider if it already initialized, otherwise just return {@code null} instead of
+   * creating it.
+   */
+  WALProvider getProviderNoCreate() {
+    return holder.get();
+  }
+
+  @Override
+  public void close() throws IOException {
+    WALProvider provider = this.holder.get();
+    if (provider != null) {
+      provider.close();
+    }
+  }
+
+  void shutdown() throws IOException {
+    WALProvider provider = this.holder.get();
+    if (provider != null) {
+      provider.shutdown();
+    }
+  }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index bc0a9eec73a..63bef79fa45 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal;
 import com.google.errorprone.annotations.RestrictedApi;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicReference;
 import org.apache.hadoop.conf.Configuration;
@@ -28,10 +29,12 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
 import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
 import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
 import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.util.CancelableProgressable;
 import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
 import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -99,15 +102,22 @@ public class WALFactory {
 
   public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
 
+  public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider";
+
   public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
 
+  static final String REPLICATION_WAL_PROVIDER_ID = "rep";
+
   final String factoryId;
   final Abortable abortable;
   private final WALProvider provider;
   // The meta updates are written to a different wal. If this
   // regionserver holds meta regions, then this ref will be non-null.
   // lazily intialized; most RegionServers don't deal with META
-  private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
+  private final LazyInitializedWALProvider metaProvider;
+  // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
+  // generate a lot useless data, see HBASE-27775 for more details.
+  private final LazyInitializedWALProvider replicationProvider;
 
   /**
    * Configuration-specified WAL Reader used when a custom reader is requested
@@ -144,13 +154,15 @@ public class WALFactory {
     factoryId = SINGLETON_ID;
     this.abortable = null;
     this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
+    this.metaProvider = null;
+    this.replicationProvider = null;
   }
 
   Providers getDefaultProvider() {
     return Providers.defaultProvider;
   }
 
-  public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
+  Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
     try {
       Providers provider = Providers.valueOf(conf.get(key, defaultValue));
 
@@ -246,6 +258,10 @@ public class WALFactory {
     this.factoryId = factoryId;
     this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
     this.abortable = abortable;
+    this.metaProvider = new LazyInitializedWALProvider(this,
+      AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable);
+    this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID,
+      REPLICATION_WAL_PROVIDER, this.abortable);
     // end required early initialization
     if (conf.getBoolean(WAL_ENABLED, true)) {
       WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
@@ -263,19 +279,45 @@ public class WALFactory {
     }
   }
 
+  public Configuration getConf() {
+    return conf;
+  }
+
   /**
    * Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
    * replay and edits that have gone to any wals from this factory.
    */
   public void close() throws IOException {
-    final WALProvider metaProvider = this.metaProvider.get();
-    if (null != metaProvider) {
-      metaProvider.close();
+    List<IOException> ioes = new ArrayList<>();
+    // these fields could be null if the WALFactory is created only for being used in the
+    // getInstance method.
+    if (metaProvider != null) {
+      try {
+        metaProvider.close();
+      } catch (IOException e) {
+        ioes.add(e);
+      }
+    }
+    if (replicationProvider != null) {
+      try {
+        replicationProvider.close();
+      } catch (IOException e) {
+        ioes.add(e);
+      }
+    }
+    if (provider != null) {
+      try {
+        provider.close();
+      } catch (IOException e) {
+        ioes.add(e);
+      }
     }
-    // close is called on a WALFactory with null provider in the case of contention handling
-    // within the getInstance method.
-    if (null != provider) {
-      provider.close();
+    if (!ioes.isEmpty()) {
+      IOException ioe = new IOException("Failed to close WALFactory");
+      for (IOException e : ioes) {
+        ioe.addSuppressed(e);
+      }
+      throw ioe;
     }
   }
 
@@ -285,18 +327,36 @@ public class WALFactory {
    * if you can as it will try to leave things as tidy as possible.
    */
   public void shutdown() throws IOException {
-    IOException exception = null;
-    final WALProvider metaProvider = this.metaProvider.get();
-    if (null != metaProvider) {
+    List<IOException> ioes = new ArrayList<>();
+    // these fields could be null if the WALFactory is created only for being used in the
+    // getInstance method.
+    if (metaProvider != null) {
       try {
         metaProvider.shutdown();
-      } catch (IOException ioe) {
-        exception = ioe;
+      } catch (IOException e) {
+        ioes.add(e);
       }
     }
-    provider.shutdown();
-    if (null != exception) {
-      throw exception;
+    if (replicationProvider != null) {
+      try {
+        replicationProvider.shutdown();
+      } catch (IOException e) {
+        ioes.add(e);
+      }
+    }
+    if (provider != null) {
+      try {
+        provider.shutdown();
+      } catch (IOException e) {
+        ioes.add(e);
+      }
+    }
+    if (!ioes.isEmpty()) {
+      IOException ioe = new IOException("Failed to shutdown WALFactory");
+      for (IOException e : ioes) {
+        ioe.addSuppressed(e);
+      }
+      throw ioe;
     }
   }
 
@@ -304,38 +364,16 @@ public class WALFactory {
     return provider.getWALs();
   }
 
-  /**
-   * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
-   * creating the first hbase:meta WAL so we can register a listener.
-   * @see #getMetaWALProvider()
-   */
-  public WALProvider getMetaProvider() throws IOException {
-    for (;;) {
-      WALProvider provider = this.metaProvider.get();
-      if (provider != null) {
-        return provider;
-      }
-      Class<? extends WALProvider> clz = null;
-      if (conf.get(META_WAL_PROVIDER) == null) {
-        try {
-          clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
-        } catch (Throwable t) {
-          // the WAL provider should be an enum. Proceed
-        }
-      }
-      if (clz == null) {
-        clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
-      }
-      provider = createProvider(clz);
-      provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
-      provider.addWALActionsListener(new MetricsWAL());
-      if (metaProvider.compareAndSet(null, provider)) {
-        return provider;
-      } else {
-        // someone is ahead of us, close and try again.
-        provider.close();
-      }
-    }
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  WALProvider getMetaProvider() throws IOException {
+    return metaProvider.getProvider();
+  }
+
+  @RestrictedApi(explanation = "Should only be called in tests", link = "",
+      allowedOnPath = ".*/src/test/.*")
+  WALProvider getReplicationProvider() throws IOException {
+    return replicationProvider.getProvider();
   }
 
   /**
@@ -343,14 +381,14 @@ public class WALFactory {
    */
   public WAL getWAL(RegionInfo region) throws IOException {
     // Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
-    if (
-      region != null && region.isMetaRegion()
-        && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID
-    ) {
-      return getMetaProvider().getWAL(region);
-    } else {
-      return provider.getWAL(region);
+    if (region != null && RegionReplicaUtil.isDefaultReplica(region)) {
+      if (region.isMetaRegion()) {
+        return metaProvider.getProvider().getWAL(region);
+      } else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) {
+        return replicationProvider.getProvider().getWAL(region);
+      }
     }
+    return provider.getWAL(region);
   }
 
   public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
@@ -527,16 +565,28 @@ public class WALFactory {
     return FSHLogProvider.createWriter(configuration, fs, path, false);
   }
 
-  public final WALProvider getWALProvider() {
+  public WALProvider getWALProvider() {
     return this.provider;
   }
 
   /**
-   * @return Current metaProvider... may be null if not yet initialized.
-   * @see #getMetaProvider()
+   * Returns all the wal providers, for example, the default one, the one for hbase:meta and the one
+   * for hbase:replication.
    */
-  public final WALProvider getMetaWALProvider() {
-    return this.metaProvider.get();
+  public List<WALProvider> getAllWALProviders() {
+    List<WALProvider> providers = new ArrayList<>();
+    if (provider != null) {
+      providers.add(provider);
+    }
+    WALProvider meta = metaProvider.getProviderNoCreate();
+    if (meta != null) {
+      providers.add(meta);
+    }
+    WALProvider replication = replicationProvider.getProviderNoCreate();
+    if (replication != null) {
+      providers.add(replication);
+    }
+    return providers;
   }
 
   public ExcludeDatanodeManager getExcludeDatanodeManager() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index 5b62b210f4b..66386d275b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
 import org.junit.BeforeClass;
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -108,7 +107,6 @@ public class TestMultiSlaveReplication {
     utility1.startMiniZKCluster();
     MiniZooKeeperCluster miniZK = utility1.getZkCluster();
     utility1.setZkCluster(miniZK);
-    new ZKWatcher(conf1, "cluster1", null, true);
 
     conf2 = new Configuration(conf1);
     conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
@@ -118,11 +116,9 @@ public class TestMultiSlaveReplication {
 
     utility2 = new HBaseTestingUtil(conf2);
     utility2.setZkCluster(miniZK);
-    new ZKWatcher(conf2, "cluster2", null, true);
 
     utility3 = new HBaseTestingUtil(conf3);
     utility3.setZkCluster(miniZK);
-    new ZKWatcher(conf3, "cluster3", null, true);
 
     table = TableDescriptorBuilder.newBuilder(tableName)
       .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
@@ -133,7 +129,7 @@ public class TestMultiSlaveReplication {
   @Test
   public void testMultiSlaveReplication() throws Exception {
     LOG.info("testCyclicReplication");
-    SingleProcessHBaseCluster master = utility1.startMiniCluster();
+    utility1.startMiniCluster();
     utility2.startMiniCluster();
     utility3.startMiniCluster();
     try (Connection conn = ConnectionFactory.createConnection(conf1);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 26c1152c05a..244c37bfe84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
 import static org.junit.Assert.assertThrows;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
 import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
 import org.apache.hadoop.hbase.testclassification.MediumTests;
 import org.apache.hadoop.hbase.testclassification.RegionServerTests;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -708,6 +710,32 @@ public class TestWALFactory {
     assertEquals(IOTestProvider.class, metaWALProvider.getClass());
   }
 
+  @Test
+  public void testCustomReplicationProvider() throws IOException {
+    final Configuration config = new Configuration();
+    config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName());
+    final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
+    Class<? extends WALProvider> walProvider =
+      walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
+    assertEquals(Providers.filesystem.clazz, walProvider);
+    WALProvider replicationWALProvider = walFactory.getReplicationProvider();
+    assertEquals(IOTestProvider.class, replicationWALProvider.getClass());
+  }
+
+  /**
+   * Confirm that we will use different WALs for hbase:meta and hbase:replication
+   */
+  @Test
+  public void testDifferentWALs() throws IOException {
+    WAL normalWAL = wals.getWAL(null);
+    WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO);
+    WAL replicationWAL = wals.getWAL(RegionInfoBuilder
+      .newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build());
+    assertNotSame(normalWAL, metaWAL);
+    assertNotSame(normalWAL, replicationWAL);
+    assertNotSame(metaWAL, replicationWAL);
+  }
+
   @Test
   public void testReaderClosedOnBadCodec() throws IOException {
     // Create our own Configuration and WALFactory to avoid breaking other test methods