You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/09/24 09:25:57 UTC
[hbase] branch master updated: HBASE-25086 Refactor Replication:
move the default ReplicationSinkService implementation out (#2444)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/master by this push:
new 8828643 HBASE-25086 Refactor Replication: move the default ReplicationSinkService implementation out (#2444)
8828643 is described below
commit 8828643bb229469e2480741793b156e8dba4122b
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Thu Sep 24 17:25:34 2020 +0800
HBASE-25086 Refactor Replication: move the default ReplicationSinkService implementation out (#2444)
Signed-off-by: meiyi <my...@gmail.com>
---
.../java/org/apache/hadoop/hbase/HConstants.java | 6 +-
.../hadoop/hbase/regionserver/HRegionServer.java | 43 +++++---
.../replication/ReplicationSinkServiceImpl.java | 115 +++++++++++++++++++++
.../replication/regionserver/Replication.java | 90 +++-------------
.../replication/regionserver/ReplicationLoad.java | 23 ++---
5 files changed, 173 insertions(+), 104 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index f6f00c5..5b4b6fb 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -981,10 +981,12 @@ public final class HConstants {
*/
public static final String
REPLICATION_SOURCE_SERVICE_CLASSNAME = "hbase.replication.source.service";
- public static final String
- REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
public static final String REPLICATION_SERVICE_CLASSNAME_DEFAULT =
"org.apache.hadoop.hbase.replication.regionserver.Replication";
+ public static final String
+ REPLICATION_SINK_SERVICE_CLASSNAME = "hbase.replication.sink.service";
+ public static final String REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT =
+ "org.apache.hadoop.hbase.replication.ReplicationSinkServiceImpl";
public static final String REPLICATION_BULKLOAD_ENABLE_KEY = "hbase.replication.bulkload.enabled";
public static final boolean REPLICATION_BULKLOAD_ENABLE_DEFAULT = false;
/** Replication cluster id of source cluster which uniquely identifies itself with peer cluster */
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
index f14da2f..cd90fb8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java
@@ -301,6 +301,7 @@ public class HRegionServer extends Thread implements
// Replication services. If no replication, this handler will be null.
private ReplicationSourceService replicationSourceHandler;
private ReplicationSinkService replicationSinkHandler;
+ private boolean sameReplicationSourceAndSink;
// Compactions
public CompactSplit compactSplitThread;
@@ -1390,20 +1391,32 @@ public class HRegionServer extends Thread implements
serverLoad.addUserLoads(createUserLoad(entry.getKey(), entry.getValue()));
}
}
- // for the replicationLoad purpose. Only need to get from one executorService
- // either source or sink will get the same info
- ReplicationSourceService rsources = getReplicationSourceService();
- if (rsources != null) {
+ if (sameReplicationSourceAndSink && replicationSourceHandler != null) {
// always refresh first to get the latest value
- ReplicationLoad rLoad = rsources.refreshAndGetReplicationLoad();
+ ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
if (rLoad != null) {
serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
- for (ClusterStatusProtos.ReplicationLoadSource rLS :
- rLoad.getReplicationLoadSourceEntries()) {
+ for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
+ .getReplicationLoadSourceEntries()) {
serverLoad.addReplLoadSource(rLS);
}
-
+ }
+ } else {
+ if (replicationSourceHandler != null) {
+ ReplicationLoad rLoad = replicationSourceHandler.refreshAndGetReplicationLoad();
+ if (rLoad != null) {
+ for (ClusterStatusProtos.ReplicationLoadSource rLS : rLoad
+ .getReplicationLoadSourceEntries()) {
+ serverLoad.addReplLoadSource(rLS);
+ }
+ }
+ }
+ if (replicationSinkHandler != null) {
+ ReplicationLoad rLoad = replicationSinkHandler.refreshAndGetReplicationLoad();
+ if (rLoad != null) {
+ serverLoad.setReplLoadSink(rLoad.getReplicationLoadSink());
+ }
}
}
@@ -1921,8 +1934,7 @@ public class HRegionServer extends Thread implements
* Start up replication source and sink handlers.
*/
private void startReplicationService() throws IOException {
- if (this.replicationSourceHandler == this.replicationSinkHandler &&
- this.replicationSourceHandler != null) {
+ if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
this.replicationSourceHandler.startReplicationService();
} else {
if (this.replicationSourceHandler != null) {
@@ -2628,9 +2640,10 @@ public class HRegionServer extends Thread implements
if (this.compactSplitThread != null) {
this.compactSplitThread.join();
}
- if (this.executorService != null) this.executorService.shutdown();
- if (this.replicationSourceHandler != null &&
- this.replicationSourceHandler == this.replicationSinkHandler) {
+ if (this.executorService != null) {
+ this.executorService.shutdown();
+ }
+ if (sameReplicationSourceAndSink && this.replicationSourceHandler != null) {
this.replicationSourceHandler.stopReplicationService();
} else {
if (this.replicationSourceHandler != null) {
@@ -3070,7 +3083,7 @@ public class HRegionServer extends Thread implements
// read in the name of the sink replication class from the config file.
String sinkClassname = conf.get(HConstants.REPLICATION_SINK_SERVICE_CLASSNAME,
- HConstants.REPLICATION_SERVICE_CLASSNAME_DEFAULT);
+ HConstants.REPLICATION_SINK_SERVICE_CLASSNAME_DEFAULT);
// If both the sink and the source class names are the same, then instantiate
// only one object.
@@ -3078,11 +3091,13 @@ public class HRegionServer extends Thread implements
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.replicationSinkHandler = (ReplicationSinkService) server.replicationSourceHandler;
+ server.sameReplicationSourceAndSink = true;
} else {
server.replicationSourceHandler = newReplicationInstance(sourceClassname,
ReplicationSourceService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
server.replicationSinkHandler = newReplicationInstance(sinkClassname,
ReplicationSinkService.class, conf, server, walFs, walDir, oldWALDir, walProvider);
+ server.sameReplicationSourceAndSink = false;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java
new file mode 100644
index 0000000..9b0e3f7
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSinkServiceImpl.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.ScheduledChore;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationLoad;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSink;
+import org.apache.hadoop.hbase.wal.WALProvider;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+
+@InterfaceAudience.Private
+public class ReplicationSinkServiceImpl implements ReplicationSinkService {
+ private static final Logger LOG = LoggerFactory.getLogger(ReplicationSinkServiceImpl.class);
+
+ private Configuration conf;
+
+ private Server server;
+
+ private ReplicationSink replicationSink;
+
+ // ReplicationLoad to access replication metrics
+ private ReplicationLoad replicationLoad;
+
+ private int statsPeriod;
+
+ @Override
+ public void replicateLogEntries(List<AdminProtos.WALEntry> entries, CellScanner cells,
+ String replicationClusterId, String sourceBaseNamespaceDirPath,
+ String sourceHFileArchiveDirPath) throws IOException {
+ this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
+ sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
+ }
+
+ @Override
+ public void initialize(Server server, FileSystem fs, Path logdir, Path oldLogDir,
+ WALProvider walProvider) throws IOException {
+ this.server = server;
+ this.conf = server.getConfiguration();
+ this.statsPeriod =
+ this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
+ this.replicationLoad = new ReplicationLoad();
+ }
+
+ @Override
+ public void startReplicationService() throws IOException {
+ this.replicationSink = new ReplicationSink(this.conf);
+ this.server.getChoreService().scheduleChore(
+ new ReplicationStatisticsChore("ReplicationSinkStatistics", server, statsPeriod));
+ }
+
+ @Override
+ public void stopReplicationService() {
+ if (this.replicationSink != null) {
+ this.replicationSink.stopReplicationSinkServices();
+ }
+ }
+
+ @Override
+ public ReplicationLoad refreshAndGetReplicationLoad() {
+ if (replicationLoad == null) {
+ return null;
+ }
+ // always build for latest data
+ replicationLoad.buildReplicationLoad(Collections.emptyList(), replicationSink.getSinkMetrics());
+ return replicationLoad;
+ }
+
+ private final class ReplicationStatisticsChore extends ScheduledChore {
+
+ ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
+ super(name, stopper, period);
+ }
+
+ @Override
+ protected void chore() {
+ printStats(replicationSink.getStats());
+ }
+
+ private void printStats(String stats) {
+ if (!stats.isEmpty()) {
+ LOG.info(stats);
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index 195877b..33975ed 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -22,18 +22,15 @@ import java.util.ArrayList;
import java.util.List;
import java.util.OptionalLong;
import java.util.UUID;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CompatibilitySingletonFactory;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ScheduledChore;
import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
@@ -51,15 +48,11 @@ import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
-
/**
* Gateway to Replication. Used by {@link org.apache.hadoop.hbase.regionserver.HRegionServer}.
*/
@InterfaceAudience.Private
-public class Replication implements ReplicationSourceService, ReplicationSinkService {
+public class Replication implements ReplicationSourceService {
private static final Logger LOG =
LoggerFactory.getLogger(Replication.class);
private boolean isReplicationForBulkLoadDataEnabled;
@@ -68,13 +61,10 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
private ReplicationPeers replicationPeers;
private ReplicationTracker replicationTracker;
private Configuration conf;
- private ReplicationSink replicationSink;
private SyncReplicationPeerInfoProvider syncReplicationPeerInfoProvider;
// Hosting server
private Server server;
- /** Statistics thread schedule pool */
- private ScheduledExecutorService scheduleThreadPool;
- private int statsThreadPeriod;
+ private int statsPeriod;
// ReplicationLoad to access replication metrics
private ReplicationLoad replicationLoad;
private MetricsReplicationGlobalSourceSource globalMetricsSource;
@@ -94,11 +84,6 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
this.conf = this.server.getConfiguration();
this.isReplicationForBulkLoadDataEnabled =
ReplicationUtils.isReplicationForBulkLoadDataEnabled(this.conf);
- this.scheduleThreadPool = Executors.newScheduledThreadPool(1,
- new ThreadFactoryBuilder()
- .setNameFormat(server.getServerName().toShortString() + "Replication Statistics #%d")
- .setDaemon(true)
- .build());
if (this.isReplicationForBulkLoadDataEnabled) {
if (conf.get(HConstants.REPLICATION_CLUSTER_ID) == null
|| conf.get(HConstants.REPLICATION_CLUSTER_ID).isEmpty()) {
@@ -154,9 +139,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
p.getSyncReplicationState(), p.getNewSyncReplicationState(), 0));
}
}
- this.statsThreadPeriod =
+ this.statsPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
- LOG.debug("Replication stats-in-log period={} seconds", this.statsThreadPeriod);
this.replicationLoad = new ReplicationLoad();
this.peerProcedureHandler =
@@ -173,39 +157,7 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
*/
@Override
public void stopReplicationService() {
- join();
- }
-
- /**
- * Join with the replication threads
- */
- public void join() {
this.replicationManager.join();
- if (this.replicationSink != null) {
- this.replicationSink.stopReplicationSinkServices();
- }
- scheduleThreadPool.shutdown();
- }
-
- /**
- * Carry on the list of log entries down to the sink
- * @param entries list of entries to replicate
- * @param cells The data -- the cells -- that <code>entries</code> describes (the entries do not
- * contain the Cells we are replicating; they are passed here on the side in this
- * CellScanner).
- * @param replicationClusterId Id which will uniquely identify source cluster FS client
- * configurations in the replication configuration directory
- * @param sourceBaseNamespaceDirPath Path that point to the source cluster base namespace
- * directory required for replicating hfiles
- * @param sourceHFileArchiveDirPath Path that point to the source cluster hfile archive directory
- * @throws IOException
- */
- @Override
- public void replicateLogEntries(List<WALEntry> entries, CellScanner cells,
- String replicationClusterId, String sourceBaseNamespaceDirPath,
- String sourceHFileArchiveDirPath) throws IOException {
- this.replicationSink.replicateEntries(entries, cells, replicationClusterId,
- sourceBaseNamespaceDirPath, sourceHFileArchiveDirPath);
}
/**
@@ -216,10 +168,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
@Override
public void startReplicationService() throws IOException {
this.replicationManager.init();
- this.replicationSink = new ReplicationSink(this.conf);
- this.scheduleThreadPool.scheduleAtFixedRate(
- new ReplicationStatisticsTask(this.replicationSink, this.replicationManager),
- statsThreadPeriod, statsThreadPeriod, TimeUnit.SECONDS);
+ this.server.getChoreService().scheduleChore(
+ new ReplicationStatisticsChore("ReplicationSourceStatistics", server, statsPeriod));
LOG.info("{} started", this.server.toString());
}
@@ -244,21 +194,15 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
/**
* Statistics task. Periodically prints the cache statistics to the log.
*/
- private final static class ReplicationStatisticsTask implements Runnable {
-
- private final ReplicationSink replicationSink;
- private final ReplicationSourceManager replicationManager;
+ private final class ReplicationStatisticsChore extends ScheduledChore {
- public ReplicationStatisticsTask(ReplicationSink replicationSink,
- ReplicationSourceManager replicationManager) {
- this.replicationManager = replicationManager;
- this.replicationSink = replicationSink;
+ ReplicationStatisticsChore(String name, Stoppable stopper, int period) {
+ super(name, stopper, period);
}
@Override
- public void run() {
- printStats(this.replicationManager.getStats());
- printStats(this.replicationSink.getStats());
+ protected void chore() {
+ printStats(replicationManager.getStats());
}
private void printStats(String stats) {
@@ -274,17 +218,11 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
return null;
}
// always build for latest data
- buildReplicationLoad();
- return this.replicationLoad;
- }
-
- private void buildReplicationLoad() {
List<ReplicationSourceInterface> allSources = new ArrayList<>();
allSources.addAll(this.replicationManager.getSources());
allSources.addAll(this.replicationManager.getOldSources());
- // get sink
- MetricsSink sinkMetrics = this.replicationSink.getSinkMetrics();
- this.replicationLoad.buildReplicationLoad(allSources, sinkMetrics);
+ this.replicationLoad.buildReplicationLoad(allSources, null);
+ return this.replicationLoad;
}
@Override
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
index e011e0a..6fb21dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationLoad.java
@@ -36,7 +36,6 @@ public class ReplicationLoad {
// Empty load instance.
public static final ReplicationLoad EMPTY_REPLICATIONLOAD = new ReplicationLoad();
- private MetricsSink sinkMetrics;
private List<ClusterStatusProtos.ReplicationLoadSource> replicationLoadSourceEntries;
private ClusterStatusProtos.ReplicationLoadSink replicationLoadSink;
@@ -49,21 +48,22 @@ public class ReplicationLoad {
/**
* buildReplicationLoad
* @param sources List of ReplicationSource instances for which metrics should be reported
- * @param skMetrics
+ * @param sinkMetrics metrics of the replication sink
*/
public void buildReplicationLoad(final List<ReplicationSourceInterface> sources,
- final MetricsSink skMetrics) {
- this.sinkMetrics = skMetrics;
+ final MetricsSink sinkMetrics) {
- // build the SinkLoad
- ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
+ if (sinkMetrics != null) {
+ // build the SinkLoad
+ ClusterStatusProtos.ReplicationLoadSink.Builder rLoadSinkBuild =
ClusterStatusProtos.ReplicationLoadSink.newBuilder();
- rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
- rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
- rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
- rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
- this.replicationLoadSink = rLoadSinkBuild.build();
+ rLoadSinkBuild.setAgeOfLastAppliedOp(sinkMetrics.getAgeOfLastAppliedOp());
+ rLoadSinkBuild.setTimeStampsOfLastAppliedOp(sinkMetrics.getTimestampOfLastAppliedOp());
+ rLoadSinkBuild.setTimestampStarted(sinkMetrics.getStartTimestamp());
+ rLoadSinkBuild.setTotalOpsProcessed(sinkMetrics.getAppliedOps());
+ this.replicationLoadSink = rLoadSinkBuild.build();
+ }
this.replicationLoadSourceEntries = new ArrayList<>();
for (ReplicationSourceInterface source : sources) {
@@ -157,5 +157,4 @@ public class ReplicationLoad {
public String toString() {
return this.sourceToString() + System.getProperty("line.separator") + this.sinkToString();
}
-
}