You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/10 01:09:01 UTC
hbase git commit: HBASE-20148 Make serial replication as a option for
a peer instead of a table
Repository: hbase
Updated Branches:
refs/heads/master 15da74cce -> dd6f4525e
HBASE-20148 Make serial replication as a option for a peer instead of a table
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/dd6f4525
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/dd6f4525
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/dd6f4525
Branch: refs/heads/master
Commit: dd6f4525e7a9eaceb6ed4f97059b2dd3c532d323
Parents: 15da74c
Author: zhangduo <zh...@apache.org>
Authored: Fri Mar 9 15:00:59 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Mar 10 09:04:44 2018 +0800
----------------------------------------------------------------------
.../apache/hadoop/hbase/HTableDescriptor.java | 8 -----
.../hadoop/hbase/client/TableDescriptor.java | 20 +++++++-----
.../hbase/client/TableDescriptorBuilder.java | 9 ------
.../replication/ReplicationPeerConfigUtil.java | 5 +++
.../replication/ReplicationPeerConfig.java | 32 +++++++++++++++-----
.../ReplicationPeerConfigBuilder.java | 12 ++++++++
.../org/apache/hadoop/hbase/HConstants.java | 6 ----
.../src/main/protobuf/Replication.proto | 1 +
.../hbase/replication/ReplicationUtils.java | 3 ++
.../master/assignment/RegionStateStore.java | 14 ++++-----
.../hbase/replication/ScopeWALEntryFilter.java | 32 ++++++++++----------
.../regionserver/ReplicationSource.java | 4 +++
.../ReplicationSourceWALActionListener.java | 10 +-----
.../ReplicationSourceWALReader.java | 6 ++--
.../regionserver/SerialReplicationChecker.java | 2 +-
.../org/apache/hadoop/hbase/wal/WALKeyImpl.java | 8 -----
.../TestReplicationWALEntryFilters.java | 15 ++++++---
.../replication/TestSerialReplication.java | 9 +++---
18 files changed, 104 insertions(+), 92 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
index 3652d10..db8870d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/HTableDescriptor.java
@@ -537,14 +537,6 @@ public class HTableDescriptor implements TableDescriptor, Comparable<HTableDescr
}
/**
- * Return true if there are at least one cf whose replication scope is serial.
- */
- @Override
- public boolean hasSerialReplicationScope() {
- return delegatee.hasSerialReplicationScope();
- }
-
- /**
* Returns the configured replicas per region
*/
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
index 3505175..1ec61a2 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptor.java
@@ -24,7 +24,7 @@ import java.util.Comparator;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
-
+import java.util.stream.Stream;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes;
@@ -232,11 +232,6 @@ public interface TableDescriptor {
boolean hasRegionMemStoreReplication();
/**
- * @return true if there are at least one cf whose replication scope is serial.
- */
- boolean hasSerialReplicationScope();
-
- /**
* Check if the compaction enable flag of the table is true. If flag is false
* then no minor/major compactions will be done in real.
*
@@ -275,6 +270,16 @@ public interface TableDescriptor {
boolean isReadOnly();
/**
+ * Check if any of the table's cfs' replication scope are set to
+ * {@link HConstants#REPLICATION_SCOPE_GLOBAL}.
+ * @return {@code true} if we have, otherwise {@code false}.
+ */
+ default boolean hasGlobalReplicationScope() {
+ return Stream.of(getColumnFamilies())
+ .anyMatch(cf -> cf.getScope() == HConstants.REPLICATION_SCOPE_GLOBAL);
+ }
+
+ /**
* Check if the table's cfs' replication scope matched with the replication state
* @param enabled replication state
* @return true if matched, otherwise false
@@ -284,8 +289,7 @@ public interface TableDescriptor {
boolean hasDisabled = false;
for (ColumnFamilyDescriptor cf : getColumnFamilies()) {
- if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL &&
- cf.getScope() != HConstants.REPLICATION_SCOPE_SERIAL) {
+ if (cf.getScope() != HConstants.REPLICATION_SCOPE_GLOBAL) {
hasDisabled = true;
} else {
hasEnabled = true;
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
index 0855f87..c1db64b 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/TableDescriptorBuilder.java
@@ -1054,15 +1054,6 @@ public class TableDescriptorBuilder {
}
/**
- * Return true if there are at least one cf whose replication scope is serial.
- */
- @Override
- public boolean hasSerialReplicationScope() {
- return families.values().stream()
- .anyMatch(column -> column.getScope() == HConstants.REPLICATION_SCOPE_SERIAL);
- }
-
- /**
* Returns the configured replicas per region
*/
@Override
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
index a234a9b..b1c1713 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationPeerConfigUtil.java
@@ -303,6 +303,10 @@ public final class ReplicationPeerConfigUtil {
builder.setReplicateAllUserTables(peer.getReplicateAll());
}
+ if (peer.hasSerial()) {
+ builder.setSerial(peer.getSerial());
+ }
+
Map<TableName, List<String>> excludeTableCFsMap = convert2Map(peer.getExcludeTableCfsList()
.toArray(new ReplicationProtos.TableCF[peer.getExcludeTableCfsCount()]));
if (excludeTableCFsMap != null) {
@@ -357,6 +361,7 @@ public final class ReplicationPeerConfigUtil {
builder.setBandwidth(peerConfig.getBandwidth());
builder.setReplicateAll(peerConfig.replicateAllUserTables());
+ builder.setSerial(peerConfig.isSerial());
ReplicationProtos.TableCF[] excludeTableCFs = convert(peerConfig.getExcludeTableCFsMap());
if (excludeTableCFs != null) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
index bf8d030..e0d9a4c 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfig.java
@@ -46,6 +46,7 @@ public class ReplicationPeerConfig {
private Map<TableName, ? extends Collection<String>> excludeTableCFsMap = null;
private Set<String> excludeNamespaces = null;
private long bandwidth = 0;
+ private final boolean serial;
private ReplicationPeerConfig(ReplicationPeerConfigBuilderImpl builder) {
this.clusterKey = builder.clusterKey;
@@ -64,6 +65,7 @@ public class ReplicationPeerConfig {
builder.excludeNamespaces != null ? Collections.unmodifiableSet(builder.excludeNamespaces)
: null;
this.bandwidth = builder.bandwidth;
+ this.serial = builder.serial;
}
private Map<TableName, List<String>>
@@ -82,6 +84,7 @@ public class ReplicationPeerConfig {
public ReplicationPeerConfig() {
this.peerData = new TreeMap<>(Bytes.BYTES_COMPARATOR);
this.configuration = new HashMap<>(0);
+ this.serial = false;
}
/**
@@ -214,16 +217,20 @@ public class ReplicationPeerConfig {
return new ReplicationPeerConfigBuilderImpl();
}
+ public boolean isSerial() {
+ return serial;
+ }
+
public static ReplicationPeerConfigBuilder newBuilder(ReplicationPeerConfig peerConfig) {
ReplicationPeerConfigBuilderImpl builder = new ReplicationPeerConfigBuilderImpl();
builder.setClusterKey(peerConfig.getClusterKey())
- .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
- .putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
- .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
- .setReplicateAllUserTables(peerConfig.replicateAllUserTables())
- .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
- .setExcludeNamespaces(peerConfig.getExcludeNamespaces())
- .setBandwidth(peerConfig.getBandwidth());
+ .setReplicationEndpointImpl(peerConfig.getReplicationEndpointImpl())
+ .putAllPeerData(peerConfig.getPeerData()).putAllConfiguration(peerConfig.getConfiguration())
+ .setTableCFsMap(peerConfig.getTableCFsMap()).setNamespaces(peerConfig.getNamespaces())
+ .setReplicateAllUserTables(peerConfig.replicateAllUserTables())
+ .setExcludeTableCFsMap(peerConfig.getExcludeTableCFsMap())
+ .setExcludeNamespaces(peerConfig.getExcludeNamespaces())
+ .setBandwidth(peerConfig.getBandwidth()).setSerial(peerConfig.isSerial());
return builder;
}
@@ -250,6 +257,8 @@ public class ReplicationPeerConfig {
private long bandwidth = 0;
+ private boolean serial = false;
+
@Override
public ReplicationPeerConfigBuilder setClusterKey(String clusterKey) {
this.clusterKey = clusterKey;
@@ -313,6 +322,12 @@ public class ReplicationPeerConfig {
}
@Override
+ public ReplicationPeerConfigBuilder setSerial(boolean serial) {
+ this.serial = serial;
+ return this;
+ }
+
+ @Override
public ReplicationPeerConfig build() {
// It would be nice to validate the configuration, but we have to work with "old" data
// from ZK which makes it much more difficult.
@@ -340,7 +355,8 @@ public class ReplicationPeerConfig {
builder.append("tableCFs=").append(tableCFsMap.toString()).append(",");
}
}
- builder.append("bandwidth=").append(bandwidth);
+ builder.append("bandwidth=").append(bandwidth).append(",");
+ builder.append("serial=").append(serial);
return builder.toString();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
index 0b2f2e2..4c531c5 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigBuilder.java
@@ -138,6 +138,18 @@ public interface ReplicationPeerConfigBuilder {
ReplicationPeerConfigBuilder setExcludeNamespaces(Set<String> namespaces);
/**
+ * <p>
+ * Sets whether we should preserve order when replicating, i.e, serial replication.
+ * </p>
+ * <p>
+ * Default {@code false}.
+ * </p>
+ * @param serial {@code true} means preserve order, otherwise {@code false}.
+ * @return {@code this}
+ */
+ ReplicationPeerConfigBuilder setSerial(boolean serial);
+
+ /**
* Builds the configuration object from the current state of {@code this}.
* @return A {@link ReplicationPeerConfig} instance.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index 3dd0ac8..0039a56 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -647,12 +647,6 @@ public final class HConstants {
public static final int REPLICATION_SCOPE_GLOBAL = 1;
/**
- * Scope tag for serially scoped data
- * This data will be replicated to all peers by the order of sequence id.
- */
- public static final int REPLICATION_SCOPE_SERIAL = 2;
-
- /**
* Default cluster ID, cannot be used to identify a cluster so a key with
* this value means it wasn't meant for replication.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-protocol-shaded/src/main/protobuf/Replication.proto
----------------------------------------------------------------------
diff --git a/hbase-protocol-shaded/src/main/protobuf/Replication.proto b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
index 9f7b4c2..557b87c 100644
--- a/hbase-protocol-shaded/src/main/protobuf/Replication.proto
+++ b/hbase-protocol-shaded/src/main/protobuf/Replication.proto
@@ -48,6 +48,7 @@ message ReplicationPeer {
optional bool replicate_all = 8;
repeated TableCF exclude_table_cfs = 9;
repeated bytes exclude_namespaces = 10;
+ optional bool serial = 11;
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 11507aa..2a6870a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -115,6 +115,9 @@ public final class ReplicationUtils {
if (rpc1.replicateAllUserTables() != rpc2.replicateAllUserTables()) {
return false;
}
+ if (rpc1.isSerial() != rpc2.isSerial()) {
+ return false;
+ }
if (rpc1.replicateAllUserTables()) {
return isNamespacesEqual(rpc1.getExcludeNamespaces(), rpc2.getExcludeNamespaces()) &&
isTableCFsEqual(rpc1.getExcludeTableCFsMap(), rpc2.getExcludeTableCFsMap());
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
index 1ffc31f..c8017202 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java
@@ -165,7 +165,7 @@ public class RegionStateStore {
MetaTableAccessor.addLocation(put, regionLocation, openSeqNum, replicaId);
// only update replication barrier for default replica
if (regionInfo.getReplicaId() == RegionInfo.DEFAULT_REPLICA_ID &&
- hasSerialReplicationScope(regionInfo.getTable())) {
+ hasGlobalReplicationScope(regionInfo.getTable())) {
MetaTableAccessor.addReplicationBarrier(put, openSeqNum);
}
info.append(", openSeqNum=").append(openSeqNum);
@@ -224,7 +224,7 @@ public class RegionStateStore {
ServerName serverName) throws IOException {
TableDescriptor htd = getTableDescriptor(parent.getTable());
long parentOpenSeqNum = HConstants.NO_SEQNUM;
- if (htd.hasSerialReplicationScope()) {
+ if (htd.hasGlobalReplicationScope()) {
parentOpenSeqNum = getOpenSeqNumForParentRegion(parent);
}
MetaTableAccessor.splitRegion(master.getConnection(), parent, parentOpenSeqNum, hriA, hriB,
@@ -239,7 +239,7 @@ public class RegionStateStore {
TableDescriptor htd = getTableDescriptor(child.getTable());
long regionAOpenSeqNum = -1L;
long regionBOpenSeqNum = -1L;
- if (htd.hasSerialReplicationScope()) {
+ if (htd.hasGlobalReplicationScope()) {
regionAOpenSeqNum = getOpenSeqNumForParentRegion(hriA);
regionBOpenSeqNum = getOpenSeqNumForParentRegion(hriB);
}
@@ -261,12 +261,12 @@ public class RegionStateStore {
// ==========================================================================
// Table Descriptors helpers
// ==========================================================================
- private boolean hasSerialReplicationScope(TableName tableName) throws IOException {
- return hasSerialReplicationScope(getTableDescriptor(tableName));
+ private boolean hasGlobalReplicationScope(TableName tableName) throws IOException {
+ return hasGlobalReplicationScope(getTableDescriptor(tableName));
}
- private boolean hasSerialReplicationScope(TableDescriptor htd) {
- return htd != null ? htd.hasSerialReplicationScope() : false;
+ private boolean hasGlobalReplicationScope(TableDescriptor htd) {
+ return htd != null ? htd.hasGlobalReplicationScope() : false;
}
private int getRegionReplication(TableDescriptor htd) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
index 6a2fbcf..f8722eb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ScopeWALEntryFilter.java
@@ -37,31 +37,31 @@ public class ScopeWALEntryFilter implements WALEntryFilter, WALCellFilter {
@Override
public Entry filter(Entry entry) {
- NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
- if (scopes == null || scopes.isEmpty()) {
- return null;
- }
+ // Do not filter out an entire entry by replication scopes. As now we support serial
+ // replication, the sequence id of a marker is also needed by upper layer. We will filter out
+ // all the cells in the filterCell method below if the replication scopes is null or empty.
return entry;
}
+ private boolean hasGlobalScope(NavigableMap<byte[], Integer> scopes, byte[] family) {
+ Integer scope = scopes.get(family);
+ return scope != null && scope.intValue() == HConstants.REPLICATION_SCOPE_GLOBAL;
+ }
@Override
public Cell filterCell(Entry entry, Cell cell) {
- final NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
- // The scope will be null or empty if
- // there's nothing to replicate in that WALEdit
- byte[] fam = CellUtil.cloneFamily(cell);
+ NavigableMap<byte[], Integer> scopes = entry.getKey().getReplicationScopes();
+ if (scopes == null || scopes.isEmpty()) {
+ return null;
+ }
+ byte[] family = CellUtil.cloneFamily(cell);
if (CellUtil.matchingColumn(cell, WALEdit.METAFAMILY, WALEdit.BULK_LOAD)) {
- cell = bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
+ return bulkLoadFilter.filterCell(cell, new Predicate<byte[]>() {
@Override
- public boolean apply(byte[] fam) {
- return !scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL;
+ public boolean apply(byte[] family) {
+ return !hasGlobalScope(scopes, family);
}
});
- } else {
- if (!scopes.containsKey(fam) || scopes.get(fam) == HConstants.REPLICATION_SCOPE_LOCAL) {
- return null;
- }
}
- return cell;
+ return hasGlobalScope(scopes, family) ? cell : null;
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 86e7f98..2f9cd56 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -382,6 +382,10 @@ public class ReplicationSource implements ReplicationSourceInterface {
return replicationPeer.isPeerEnabled();
}
+ public boolean isSerial() {
+ return replicationPeer.getPeerConfig().isSerial();
+ }
+
private void initialize() {
int sleepMultiplier = 1;
while (this.isSourceActive()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
index 95fc6a0..27b25c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -72,18 +72,10 @@ class ReplicationSourceWALActionListener implements WALActionsListener {
if (ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf)) {
return;
}
- WALKeyImpl keyImpl = (WALKeyImpl) logKey;
- // For serial replication we need to count all the sequence ids even for markers, so here we
- // always need to retain the replication scopes to let the replication wal reader to know that
- // we need serial replication. The ScopeWALEntryFilter will help filtering out the cell for
- // WALEdit.METAFAMILY.
- if (keyImpl.hasSerialReplicationScope()) {
- return;
- }
// For replay, or if all the cells are markers, do not need to store replication scope.
if (logEdit.isReplay() ||
logEdit.getCells().stream().allMatch(c -> CellUtil.matchingFamily(c, WALEdit.METAFAMILY))) {
- keyImpl.clearReplicationScope();
+ ((WALKeyImpl) logKey).clearReplicationScope();
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
index ad3baaf..da92a09 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALReader.java
@@ -186,9 +186,9 @@ public class ReplicationSourceWALReader extends Thread {
new WALEntryBatch(replicationBatchCountCapacity, entryStream.getCurrentPath());
do {
Entry entry = entryStream.peek();
- boolean hasSerialReplicationScope = entry.getKey().hasSerialReplicationScope();
+ boolean isSerial = source.isSerial();
boolean doFiltering = true;
- if (hasSerialReplicationScope) {
+ if (isSerial) {
if (firstCellInEntryBeforeFiltering == null) {
assert !entry.getEdit().isEmpty() : "should not write empty edits";
// Used to locate the region record in meta table. In WAL we only have the table name and
@@ -208,7 +208,7 @@ public class ReplicationSourceWALReader extends Thread {
entry = filterEntry(entry);
}
if (entry != null) {
- if (hasSerialReplicationScope) {
+ if (isSerial) {
if (!serialReplicationChecker.canPush(entry, firstCellInEntryBeforeFiltering)) {
if (batch.getLastWalPosition() > positionBefore) {
// we have something that can push, break
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
index 9276359..b775d25 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/SerialReplicationChecker.java
@@ -266,7 +266,7 @@ class SerialReplicationChecker {
throws IOException, InterruptedException {
byte[] row = CellUtil.cloneRow(firstCellInEdit);
while (!canPush(entry, row)) {
- LOG.debug("Can not push{}, wait", entry);
+ LOG.debug("Can not push {}, wait", entry);
Thread.sleep(waitTimeMs);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
index ac23d1d..8828239 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKeyImpl.java
@@ -419,14 +419,6 @@ public class WALKeyImpl implements WALKey {
setReplicationScope(null);
}
- public boolean hasSerialReplicationScope() {
- if (replicationScope == null || replicationScope.isEmpty()) {
- return false;
- }
- return replicationScope.values().stream()
- .anyMatch(scope -> scope.intValue() == HConstants.REPLICATION_SCOPE_SERIAL);
- }
-
/**
* Marks that the cluster with the given clusterId has consumed the change
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
index 67a2551..f2c5e50 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationWALEntryFilters.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -32,9 +33,9 @@ import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparatorImpl;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -48,7 +49,7 @@ import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-@Category({ReplicationTests.class, SmallTests.class})
+@Category({ ReplicationTests.class, SmallTests.class })
public class TestReplicationWALEntryFilters {
@ClassRule
@@ -65,7 +66,8 @@ public class TestReplicationWALEntryFilters {
SystemTableWALEntryFilter filter = new SystemTableWALEntryFilter();
// meta
- WALKeyImpl key1 = new WALKeyImpl(HRegionInfo.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
+ WALKeyImpl key1 =
+ new WALKeyImpl(RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(),
TableName.META_TABLE_NAME, System.currentTimeMillis());
Entry metaEntry = new Entry(key1, null);
@@ -96,12 +98,15 @@ public class TestReplicationWALEntryFilters {
Entry userEntryEmpty = createEntry(null);
// no scopes
- assertEquals(null, filter.filter(userEntry));
+ // now we will not filter out entries without a replication scope since serial replication still
+ // need the sequence id, but the cells will all be filtered out.
+ assertTrue(filter.filter(userEntry).getEdit().isEmpty());
// empty scopes
+ // ditto
TreeMap<byte[], Integer> scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
userEntry = createEntry(scopes, a, b);
- assertEquals(null, filter.filter(userEntry));
+ assertTrue(filter.filter(userEntry).getEdit().isEmpty());
// different scope
scopes = new TreeMap<>(Bytes.BYTES_COMPARATOR);
http://git-wip-us.apache.org/repos/asf/hbase/blob/dd6f4525/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
index bf6c0c8..f8efcf0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestSerialReplication.java
@@ -156,7 +156,8 @@ public class TestSerialReplication {
// add in disable state, so later when enabling it all sources will start push together.
UTIL.getAdmin().addReplicationPeer(PEER_ID,
ReplicationPeerConfig.newBuilder().setClusterKey("127.0.0.1:2181:/hbase")
- .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).build(),
+ .setReplicationEndpointImpl(LocalReplicationEndpoint.class.getName()).setSerial(true)
+ .build(),
false);
}
@@ -234,7 +235,7 @@ public class TestSerialReplication {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
@@ -273,7 +274,7 @@ public class TestSerialReplication {
TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName).addColumnFamily(ColumnFamilyDescriptorBuilder
- .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_SERIAL).build()).build());
+ .newBuilder(CF).setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build()).build());
UTIL.waitTableAvailable(tableName);
try (Table table = UTIL.getConnection().getTable(tableName)) {
for (int i = 0; i < 100; i++) {
@@ -330,7 +331,7 @@ public class TestSerialReplication {
UTIL.getAdmin().createTable(
TableDescriptorBuilder.newBuilder(tableName)
.addColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF)
- .setScope(HConstants.REPLICATION_SCOPE_SERIAL).build())
+ .setScope(HConstants.REPLICATION_SCOPE_GLOBAL).build())
.build(),
new byte[][] { splitKey });
UTIL.waitTableAvailable(tableName);