You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2023/05/15 14:54:13 UTC
[hbase] 13/16: HBASE-27775 Use a separate WAL provider for hbase:replication table (#5157)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 8fe691c705af23ee8ae0a09e5f7c3a8559b4df98
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Sat Apr 8 10:50:42 2023 +0800
HBASE-27775 Use a separate WAL provider for hbase:replication table (#5157)
Signed-off-by: Liangjun He <he...@apache.org>
---
.../replication/ReplicationStorageFactory.java | 6 +
.../MetricsRegionServerWrapperImpl.java | 12 +-
.../hbase/wal/LazyInitializedWALProvider.java | 108 +++++++++++++
.../org/apache/hadoop/hbase/wal/WALFactory.java | 172 +++++++++++++--------
.../replication/TestMultiSlaveReplication.java | 6 +-
.../apache/hadoop/hbase/wal/TestWALFactory.java | 28 ++++
6 files changed, 260 insertions(+), 72 deletions(-)
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index 0b0eb0fc43f..ada127ee783 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -136,4 +136,10 @@ public final class ReplicationStorageFactory {
return ReflectionUtils.newInstance(clazz, conf, tableName);
}
}
+
+ public static boolean isReplicationQueueTable(Configuration conf, TableName tableName) {
+ TableName replicationQueueTableName = TableName.valueOf(conf.get(REPLICATION_QUEUE_TABLE_NAME,
+ REPLICATION_QUEUE_TABLE_NAME_DEFAULT.getNameAsString()));
+ return replicationQueueTableName.equals(tableName);
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
index dd8c9c55127..6c7fc504b5f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MetricsRegionServerWrapperImpl.java
@@ -982,12 +982,12 @@ class MetricsRegionServerWrapperImpl implements MetricsRegionServerWrapper {
lastRan = currentTime;
- final WALProvider provider = regionServer.getWalFactory().getWALProvider();
- final WALProvider metaProvider = regionServer.getWalFactory().getMetaWALProvider();
- numWALFiles = (provider == null ? 0 : provider.getNumLogFiles())
- + (metaProvider == null ? 0 : metaProvider.getNumLogFiles());
- walFileSize = (provider == null ? 0 : provider.getLogFileSize())
- + (metaProvider == null ? 0 : metaProvider.getLogFileSize());
+ List<WALProvider> providers = regionServer.getWalFactory().getAllWALProviders();
+ for (WALProvider provider : providers) {
+ numWALFiles += provider.getNumLogFiles();
+ walFileSize += provider.getLogFileSize();
+ }
+
// Copy over computed values so that no thread sees half computed values.
numStores = tempNumStores;
numStoreFiles = tempNumStoreFiles;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java
new file mode 100644
index 00000000000..2a95b182130
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LazyInitializedWALProvider.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.wal;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
+import org.apache.hadoop.hbase.wal.WALFactory.Providers;
+import org.apache.yetus.audience.InterfaceAudience;
+
+/**
+ * A lazy initialized WAL provider for holding the WALProvider for some special tables, such as
+ * hbase:meta, hbase:replication, etc.
+ */
+@InterfaceAudience.Private
+class LazyInitializedWALProvider implements Closeable {
+
+ private final WALFactory factory;
+
+ private final String providerId;
+
+ private final String providerConfigName;
+
+ private final Abortable abortable;
+
+ private final AtomicReference<WALProvider> holder = new AtomicReference<>();
+
+ LazyInitializedWALProvider(WALFactory factory, String providerId, String providerConfigName,
+ Abortable abortable) {
+ this.factory = factory;
+ this.providerId = providerId;
+ this.providerConfigName = providerConfigName;
+ this.abortable = abortable;
+ }
+
+ WALProvider getProvider() throws IOException {
+ Configuration conf = factory.getConf();
+ for (;;) {
+ WALProvider provider = this.holder.get();
+ if (provider != null) {
+ return provider;
+ }
+ Class<? extends WALProvider> clz = null;
+ if (conf.get(providerConfigName) == null) {
+ try {
+ clz = conf.getClass(WALFactory.WAL_PROVIDER, Providers.defaultProvider.clazz,
+ WALProvider.class);
+ } catch (Throwable t) {
+ // the WAL provider should be an enum. Proceed
+ }
+ }
+ if (clz == null) {
+ clz = factory.getProviderClass(providerConfigName,
+ conf.get(WALFactory.WAL_PROVIDER, WALFactory.DEFAULT_WAL_PROVIDER));
+ }
+ provider = WALFactory.createProvider(clz);
+ provider.init(factory, conf, providerId, this.abortable);
+ provider.addWALActionsListener(new MetricsWAL());
+ if (this.holder.compareAndSet(null, provider)) {
+ return provider;
+ } else {
+ // someone is ahead of us, close and try again.
+ provider.close();
+ }
+ }
+ }
+
+ /**
+ * Get the provider if it already initialized, otherwise just return {@code null} instead of
+ * creating it.
+ */
+ WALProvider getProviderNoCreate() {
+ return holder.get();
+ }
+
+ @Override
+ public void close() throws IOException {
+ WALProvider provider = this.holder.get();
+ if (provider != null) {
+ provider.close();
+ }
+ }
+
+ void shutdown() throws IOException {
+ WALProvider provider = this.holder.get();
+ if (provider != null) {
+ provider.shutdown();
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
index bc0a9eec73a..63bef79fa45 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALFactory.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.wal;
import com.google.errorprone.annotations.RestrictedApi;
import java.io.IOException;
import java.io.InterruptedIOException;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
@@ -28,10 +29,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.io.asyncfs.monitor.ExcludeDatanodeManager;
import org.apache.hadoop.hbase.regionserver.wal.MetricsWAL;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALStreamReader;
import org.apache.hadoop.hbase.regionserver.wal.ProtobufWALTailingReader;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.LeaseNotRecoveredException;
@@ -99,15 +102,22 @@ public class WALFactory {
public static final String META_WAL_PROVIDER = "hbase.wal.meta_provider";
+ public static final String REPLICATION_WAL_PROVIDER = "hbase.wal.replication_provider";
+
public static final String WAL_ENABLED = "hbase.regionserver.hlog.enabled";
+ static final String REPLICATION_WAL_PROVIDER_ID = "rep";
+
final String factoryId;
final Abortable abortable;
private final WALProvider provider;
// The meta updates are written to a different wal. If this
// regionserver holds meta regions, then this ref will be non-null.
// lazily intialized; most RegionServers don't deal with META
- private final AtomicReference<WALProvider> metaProvider = new AtomicReference<>();
+ private final LazyInitializedWALProvider metaProvider;
+ // This is for avoid hbase:replication itself keeps trigger unnecessary updates to WAL file and
+ // generate a lot useless data, see HBASE-27775 for more details.
+ private final LazyInitializedWALProvider replicationProvider;
/**
* Configuration-specified WAL Reader used when a custom reader is requested
@@ -144,13 +154,15 @@ public class WALFactory {
factoryId = SINGLETON_ID;
this.abortable = null;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
+ this.metaProvider = null;
+ this.replicationProvider = null;
}
Providers getDefaultProvider() {
return Providers.defaultProvider;
}
- public Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
+ Class<? extends WALProvider> getProviderClass(String key, String defaultValue) {
try {
Providers provider = Providers.valueOf(conf.get(key, defaultValue));
@@ -246,6 +258,10 @@ public class WALFactory {
this.factoryId = factoryId;
this.excludeDatanodeManager = new ExcludeDatanodeManager(conf);
this.abortable = abortable;
+ this.metaProvider = new LazyInitializedWALProvider(this,
+ AbstractFSWALProvider.META_WAL_PROVIDER_ID, META_WAL_PROVIDER, this.abortable);
+ this.replicationProvider = new LazyInitializedWALProvider(this, REPLICATION_WAL_PROVIDER_ID,
+ REPLICATION_WAL_PROVIDER, this.abortable);
// end required early initialization
if (conf.getBoolean(WAL_ENABLED, true)) {
WALProvider provider = createProvider(getProviderClass(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
@@ -263,19 +279,45 @@ public class WALFactory {
}
}
+ public Configuration getConf() {
+ return conf;
+ }
+
/**
* Shutdown all WALs and clean up any underlying storage. Use only when you will not need to
* replay and edits that have gone to any wals from this factory.
*/
public void close() throws IOException {
- final WALProvider metaProvider = this.metaProvider.get();
- if (null != metaProvider) {
- metaProvider.close();
+ List<IOException> ioes = new ArrayList<>();
+ // these fields could be null if the WALFactory is created only for being used in the
+ // getInstance method.
+ if (metaProvider != null) {
+ try {
+ metaProvider.close();
+ } catch (IOException e) {
+ ioes.add(e);
+ }
+ }
+ if (replicationProvider != null) {
+ try {
+ replicationProvider.close();
+ } catch (IOException e) {
+ ioes.add(e);
+ }
+ }
+ if (provider != null) {
+ try {
+ provider.close();
+ } catch (IOException e) {
+ ioes.add(e);
+ }
}
- // close is called on a WALFactory with null provider in the case of contention handling
- // within the getInstance method.
- if (null != provider) {
- provider.close();
+ if (!ioes.isEmpty()) {
+ IOException ioe = new IOException("Failed to close WALFactory");
+ for (IOException e : ioes) {
+ ioe.addSuppressed(e);
+ }
+ throw ioe;
}
}
@@ -285,18 +327,36 @@ public class WALFactory {
* if you can as it will try to leave things as tidy as possible.
*/
public void shutdown() throws IOException {
- IOException exception = null;
- final WALProvider metaProvider = this.metaProvider.get();
- if (null != metaProvider) {
+ List<IOException> ioes = new ArrayList<>();
+ // these fields could be null if the WALFactory is created only for being used in the
+ // getInstance method.
+ if (metaProvider != null) {
try {
metaProvider.shutdown();
- } catch (IOException ioe) {
- exception = ioe;
+ } catch (IOException e) {
+ ioes.add(e);
}
}
- provider.shutdown();
- if (null != exception) {
- throw exception;
+ if (replicationProvider != null) {
+ try {
+ replicationProvider.shutdown();
+ } catch (IOException e) {
+ ioes.add(e);
+ }
+ }
+ if (provider != null) {
+ try {
+ provider.shutdown();
+ } catch (IOException e) {
+ ioes.add(e);
+ }
+ }
+ if (!ioes.isEmpty()) {
+ IOException ioe = new IOException("Failed to shutdown WALFactory");
+ for (IOException e : ioes) {
+ ioe.addSuppressed(e);
+ }
+ throw ioe;
}
}
@@ -304,38 +364,16 @@ public class WALFactory {
return provider.getWALs();
}
- /**
- * Called when we lazily create a hbase:meta WAL OR from ReplicationSourceManager ahead of
- * creating the first hbase:meta WAL so we can register a listener.
- * @see #getMetaWALProvider()
- */
- public WALProvider getMetaProvider() throws IOException {
- for (;;) {
- WALProvider provider = this.metaProvider.get();
- if (provider != null) {
- return provider;
- }
- Class<? extends WALProvider> clz = null;
- if (conf.get(META_WAL_PROVIDER) == null) {
- try {
- clz = conf.getClass(WAL_PROVIDER, Providers.defaultProvider.clazz, WALProvider.class);
- } catch (Throwable t) {
- // the WAL provider should be an enum. Proceed
- }
- }
- if (clz == null) {
- clz = getProviderClass(META_WAL_PROVIDER, conf.get(WAL_PROVIDER, DEFAULT_WAL_PROVIDER));
- }
- provider = createProvider(clz);
- provider.init(this, conf, AbstractFSWALProvider.META_WAL_PROVIDER_ID, this.abortable);
- provider.addWALActionsListener(new MetricsWAL());
- if (metaProvider.compareAndSet(null, provider)) {
- return provider;
- } else {
- // someone is ahead of us, close and try again.
- provider.close();
- }
- }
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ WALProvider getMetaProvider() throws IOException {
+ return metaProvider.getProvider();
+ }
+
+ @RestrictedApi(explanation = "Should only be called in tests", link = "",
+ allowedOnPath = ".*/src/test/.*")
+ WALProvider getReplicationProvider() throws IOException {
+ return replicationProvider.getProvider();
}
/**
@@ -343,14 +381,14 @@ public class WALFactory {
*/
public WAL getWAL(RegionInfo region) throws IOException {
// Use different WAL for hbase:meta. Instantiates the meta WALProvider if not already up.
- if (
- region != null && region.isMetaRegion()
- && region.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID
- ) {
- return getMetaProvider().getWAL(region);
- } else {
- return provider.getWAL(region);
+ if (region != null && RegionReplicaUtil.isDefaultReplica(region)) {
+ if (region.isMetaRegion()) {
+ return metaProvider.getProvider().getWAL(region);
+ } else if (ReplicationStorageFactory.isReplicationQueueTable(conf, region.getTable())) {
+ return replicationProvider.getProvider().getWAL(region);
+ }
}
+ return provider.getWAL(region);
}
public WALStreamReader createStreamReader(FileSystem fs, Path path) throws IOException {
@@ -527,16 +565,28 @@ public class WALFactory {
return FSHLogProvider.createWriter(configuration, fs, path, false);
}
- public final WALProvider getWALProvider() {
+ public WALProvider getWALProvider() {
return this.provider;
}
/**
- * @return Current metaProvider... may be null if not yet initialized.
- * @see #getMetaProvider()
+ * Returns all the wal providers, for example, the default one, the one for hbase:meta and the one
+ * for hbase:replication.
*/
- public final WALProvider getMetaWALProvider() {
- return this.metaProvider.get();
+ public List<WALProvider> getAllWALProviders() {
+ List<WALProvider> providers = new ArrayList<>();
+ if (provider != null) {
+ providers.add(provider);
+ }
+ WALProvider meta = metaProvider.getProviderNoCreate();
+ if (meta != null) {
+ providers.add(meta);
+ }
+ WALProvider replication = replicationProvider.getProviderNoCreate();
+ if (replication != null) {
+ providers.add(replication);
+ }
+ return providers;
}
public ExcludeDatanodeManager getExcludeDatanodeManager() {
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
index 5b62b210f4b..66386d275b2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMultiSlaveReplication.java
@@ -51,7 +51,6 @@ import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
@@ -108,7 +107,6 @@ public class TestMultiSlaveReplication {
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
utility1.setZkCluster(miniZK);
- new ZKWatcher(conf1, "cluster1", null, true);
conf2 = new Configuration(conf1);
conf2.set(HConstants.ZOOKEEPER_ZNODE_PARENT, "/2");
@@ -118,11 +116,9 @@ public class TestMultiSlaveReplication {
utility2 = new HBaseTestingUtil(conf2);
utility2.setZkCluster(miniZK);
- new ZKWatcher(conf2, "cluster2", null, true);
utility3 = new HBaseTestingUtil(conf3);
utility3.setZkCluster(miniZK);
- new ZKWatcher(conf3, "cluster3", null, true);
table = TableDescriptorBuilder.newBuilder(tableName)
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(famName)
@@ -133,7 +129,7 @@ public class TestMultiSlaveReplication {
@Test
public void testMultiSlaveReplication() throws Exception {
LOG.info("testCyclicReplication");
- SingleProcessHBaseCluster master = utility1.startMiniCluster();
+ utility1.startMiniCluster();
utility2.startMiniCluster();
utility3.startMiniCluster();
try (Connection conn = ConnectionFactory.createConnection(conf1);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
index 26c1152c05a..244c37bfe84 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALFactory.java
@@ -22,6 +22,7 @@ import static org.apache.hadoop.hbase.wal.WALFactory.WAL_PROVIDER;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
import static org.junit.Assert.assertThrows;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -64,6 +65,7 @@ import org.apache.hadoop.hbase.regionserver.wal.CompressionContext;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hadoop.hbase.regionserver.wal.WALCoprocessorHost;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -708,6 +710,32 @@ public class TestWALFactory {
assertEquals(IOTestProvider.class, metaWALProvider.getClass());
}
+ @Test
+ public void testCustomReplicationProvider() throws IOException {
+ final Configuration config = new Configuration();
+ config.set(WALFactory.REPLICATION_WAL_PROVIDER, IOTestProvider.class.getName());
+ final WALFactory walFactory = new WALFactory(config, this.currentServername.toString());
+ Class<? extends WALProvider> walProvider =
+ walFactory.getProviderClass(WALFactory.WAL_PROVIDER, Providers.filesystem.name());
+ assertEquals(Providers.filesystem.clazz, walProvider);
+ WALProvider replicationWALProvider = walFactory.getReplicationProvider();
+ assertEquals(IOTestProvider.class, replicationWALProvider.getClass());
+ }
+
+ /**
+ * Confirm that we will use different WALs for hbase:meta and hbase:replication
+ */
+ @Test
+ public void testDifferentWALs() throws IOException {
+ WAL normalWAL = wals.getWAL(null);
+ WAL metaWAL = wals.getWAL(RegionInfoBuilder.FIRST_META_REGIONINFO);
+ WAL replicationWAL = wals.getWAL(RegionInfoBuilder
+ .newBuilder(ReplicationStorageFactory.REPLICATION_QUEUE_TABLE_NAME_DEFAULT).build());
+ assertNotSame(normalWAL, metaWAL);
+ assertNotSame(normalWAL, replicationWAL);
+ assertNotSame(metaWAL, replicationWAL);
+ }
+
@Test
public void testReaderClosedOnBadCodec() throws IOException {
// Create our own Configuration and WALFactory to avoid breaking other test methods