You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zg...@apache.org on 2020/07/08 06:34:10 UTC
[hbase] 01/01: HBASE-24682 Refactor ReplicationSource#addHFileRefs
method: move it to ReplicationSourceManager (#2020)
This is an automated email from the ASF dual-hosted git repository.
zghao pushed a commit to branch HBASE-24666
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit c2949719c5960ca564f9d01cf415e6e0d1e637f1
Author: Guanghao Zhang <zg...@apache.org>
AuthorDate: Wed Jul 8 14:29:08 2020 +0800
HBASE-24682 Refactor ReplicationSource#addHFileRefs method: move it to ReplicationSourceManager (#2020)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
---
.../regionserver/ReplicationSource.java | 38 +----------------
.../regionserver/ReplicationSourceInterface.java | 14 -------
.../regionserver/ReplicationSourceManager.java | 48 +++++++++++++++++++++-
.../hbase/replication/ReplicationSourceDummy.java | 9 +---
4 files changed, 49 insertions(+), 60 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index c4936e6..025d899 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -28,7 +28,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -36,6 +35,7 @@ import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -45,21 +45,17 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost;
import org.apache.hadoop.hbase.replication.ChainWALEntryFilter;
import org.apache.hadoop.hbase.replication.ClusterMarkingEntryFilter;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.WAL.Entry;
@@ -224,38 +220,6 @@ public class ReplicationSource implements ReplicationSourceInterface {
}
}
- @Override
- public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
- throws ReplicationException {
- String peerId = replicationPeer.getId();
- Set<String> namespaces = replicationPeer.getNamespaces();
- Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
- if (tableCFMap != null) { // All peers with TableCFs
- List<String> tableCfs = tableCFMap.get(tableName);
- if (tableCFMap.containsKey(tableName)
- && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
- this.queueStorage.addHFileRefs(peerId, pairs);
- metrics.incrSizeOfHFileRefsQueue(pairs.size());
- } else {
- LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
- tableName, Bytes.toString(family), peerId);
- }
- } else if (namespaces != null) { // Only for set NAMESPACES peers
- if (namespaces.contains(tableName.getNamespaceAsString())) {
- this.queueStorage.addHFileRefs(peerId, pairs);
- metrics.incrSizeOfHFileRefsQueue(pairs.size());
- } else {
- LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
- tableName, Bytes.toString(family), peerId);
- }
- } else {
- // user has explicitly not defined any table cfs for replication, means replicate all the
- // data
- this.queueStorage.addHFileRefs(peerId, pairs);
- metrics.incrSizeOfHFileRefsQueue(pairs.size());
- }
- }
-
private ReplicationEndpoint createReplicationEndpoint()
throws InstantiationException, IllegalAccessException, ClassNotFoundException, IOException {
RegionServerCoprocessorHost rsServerHost = null;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
index 0bd90cf..33a413f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
@@ -29,12 +29,9 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
@@ -63,17 +60,6 @@ public interface ReplicationSourceInterface {
void enqueueLog(Path log);
/**
- * Add hfile names to the queue to be replicated.
- * @param tableName Name of the table these files belongs to
- * @param family Name of the family these files belong to
- * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
- * will be added in the queue for replication}
- * @throws ReplicationException If failed to add hfile references
- */
- void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
- throws ReplicationException;
-
- /**
* Start the replication
*/
void startup();
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 1a012bd..a559b3d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.replication.SyncReplicationState;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
@@ -170,6 +171,8 @@ public class ReplicationSourceManager implements ReplicationListener {
// replication peer.
private final int maxRetriesMultiplier;
+ private final Map<String, MetricsSource> sourceMetrics = new HashMap<>();
+
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
@@ -348,6 +351,7 @@ public class ReplicationSourceManager implements ReplicationListener {
ReplicationSourceInterface src = ReplicationSourceFactory.create(conf, queueId);
MetricsSource metrics = new MetricsSource(queueId);
+ sourceMetrics.put(queueId, metrics);
// init replication source
src.init(conf, fs, this, queueStorage, replicationPeer, server, queueId, clusterId,
walFileLengthProvider, metrics);
@@ -1120,7 +1124,49 @@ public class ReplicationSourceManager implements ReplicationListener {
public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> pairs)
throws IOException {
for (ReplicationSourceInterface source : this.sources.values()) {
- throwIOExceptionWhenFail(() -> source.addHFileRefs(tableName, family, pairs));
+ throwIOExceptionWhenFail(() -> addHFileRefs(source.getPeerId(), tableName, family, pairs));
+ }
+ }
+
+ /**
+ * Add hfile names to the queue to be replicated.
+ * @param peerId the replication peer id
+ * @param tableName Name of the table these files belongs to
+ * @param family Name of the family these files belong to
+ * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
+ * will be added in the queue for replication}
+ * @throws ReplicationException If failed to add hfile references
+ */
+ private void addHFileRefs(String peerId, TableName tableName, byte[] family,
+ List<Pair<Path, Path>> pairs) throws ReplicationException {
+ // Only the normal replication source update here, its peerId is equals to queueId.
+ MetricsSource metrics = sourceMetrics.get(peerId);
+ ReplicationPeer replicationPeer = replicationPeers.getPeer(peerId);
+ Set<String> namespaces = replicationPeer.getNamespaces();
+ Map<TableName, List<String>> tableCFMap = replicationPeer.getTableCFs();
+ if (tableCFMap != null) { // All peers with TableCFs
+ List<String> tableCfs = tableCFMap.get(tableName);
+ if (tableCFMap.containsKey(tableName)
+ && (tableCfs == null || tableCfs.contains(Bytes.toString(family)))) {
+ this.queueStorage.addHFileRefs(peerId, pairs);
+ metrics.incrSizeOfHFileRefsQueue(pairs.size());
+ } else {
+ LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+ tableName, Bytes.toString(family), peerId);
+ }
+ } else if (namespaces != null) { // Only for set NAMESPACES peers
+ if (namespaces.contains(tableName.getNamespaceAsString())) {
+ this.queueStorage.addHFileRefs(peerId, pairs);
+ metrics.incrSizeOfHFileRefsQueue(pairs.size());
+ } else {
+ LOG.debug("HFiles will not be replicated belonging to the table {} family {} to peer id {}",
+ tableName, Bytes.toString(family), peerId);
+ }
+ } else {
+ // user has explicitly not defined any table cfs for replication, means replicate all the
+ // data
+ this.queueStorage.addHFileRefs(peerId, pairs);
+ metrics.incrSizeOfHFileRefsQueue(pairs.size());
}
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a361c44..781a1da 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -21,17 +21,16 @@ import java.io.IOException;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
import org.apache.hadoop.hbase.replication.regionserver.WALFileLengthProvider;
-import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL.Entry;
/**
@@ -114,12 +113,6 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
}
@Override
- public void addHFileRefs(TableName tableName, byte[] family, List<Pair<Path, Path>> files)
- throws ReplicationException {
- return;
- }
-
- @Override
public boolean isPeerEnabled() {
return true;
}