You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/02/05 04:14:50 UTC
[16/28] hbase git commit: HBASE-19926 Use a separated class to
implement the WALActionListener for Replication
HBASE-19926 Use a separated class to implement the WALActionListener for Replication
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/14420e1b
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/14420e1b
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/14420e1b
Branch: refs/heads/HBASE-19064
Commit: 14420e1b415cd468f652bf0137bda575e0a5980a
Parents: 397d347
Author: zhangduo <zh...@apache.org>
Authored: Sun Feb 4 10:42:33 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sun Feb 4 20:36:43 2018 +0800
----------------------------------------------------------------------
.../replication/regionserver/Replication.java | 22 +----
.../regionserver/ReplicationSourceManager.java | 45 ---------
.../ReplicationSourceWALActionListener.java | 98 ++++++++++++++++++++
.../TestReplicationSourceManager.java | 27 +-----
4 files changed, 105 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/14420e1b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index aaf3beb..7803ac4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -34,7 +34,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.regionserver.ReplicationSinkService;
import org.apache.hadoop.hbase.regionserver.ReplicationSourceService;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
@@ -42,8 +41,6 @@ import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.yetus.audience.InterfaceAudience;
@@ -127,23 +124,8 @@ public class Replication implements ReplicationSourceService, ReplicationSinkSer
replicationTracker, conf, this.server, fs, logDir, oldLogDir, clusterId,
walProvider != null ? walProvider.getWALFileLengthProvider() : p -> OptionalLong.empty());
if (walProvider != null) {
- walProvider.addWALActionsListener(new WALActionsListener() {
-
- @Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- replicationManager.preLogRoll(newPath);
- }
-
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- replicationManager.postLogRoll(newPath);
- }
-
- @Override
- public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
- replicationManager.scopeWALEdits(logKey, logEdit);
- }
- });
+ walProvider
+ .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
}
this.statsThreadPeriod =
this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
http://git-wip-us.apache.org/repos/asf/hbase/blob/14420e1b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 6e87563..85b2e85 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -43,8 +43,6 @@ import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.Server;
@@ -60,13 +58,9 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -74,8 +68,6 @@ import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
-
/**
* This class is responsible to manage all the replication sources. There are two classes of
* sources:
@@ -609,43 +601,6 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
- void scopeWALEdits(WALKey logKey, WALEdit logEdit) throws IOException {
- scopeWALEdits(logKey, logEdit, this.conf);
- }
-
- /**
- * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
- * compaction WAL edits and if the scope is local.
- * @param logKey Key that may get scoped according to its edits
- * @param logEdit Edits used to lookup the scopes
- * @throws IOException If failed to parse the WALEdit
- */
- @VisibleForTesting
- static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
- boolean replicationForBulkLoadEnabled =
- ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
- boolean foundOtherEdits = false;
- for (Cell cell : logEdit.getCells()) {
- if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
- foundOtherEdits = true;
- break;
- }
- }
-
- if (!foundOtherEdits && logEdit.getCells().size() > 0) {
- WALProtos.RegionEventDescriptor maybeEvent =
- WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
- if (maybeEvent != null &&
- (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
- // In serially replication, we use scopes when reading close marker.
- foundOtherEdits = true;
- }
- }
- if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
- ((WALKeyImpl) logKey).serializeReplicationScope(false);
- }
- }
-
@Override
public void regionServerRemoved(String regionserver) {
transferQueues(ServerName.valueOf(regionserver));
http://git-wip-us.apache.org/repos/asf/hbase/blob/14420e1b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
new file mode 100644
index 0000000..eb12614
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceWALActionListener.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
+
+/**
+ * Used to receive new wals.
+ */
+@InterfaceAudience.Private
+class ReplicationSourceWALActionListener implements WALActionsListener {
+
+ private final Configuration conf;
+
+ private final ReplicationSourceManager manager;
+
+ public ReplicationSourceWALActionListener(Configuration conf, ReplicationSourceManager manager) {
+ this.conf = conf;
+ this.manager = manager;
+ }
+
+ @Override
+ public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+ manager.preLogRoll(newPath);
+ }
+
+ @Override
+ public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+ manager.postLogRoll(newPath);
+ }
+
+ @Override
+ public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
+ scopeWALEdits(logKey, logEdit, conf);
+ }
+
+ /**
+ * Utility method used to set the correct scopes on each log key. Doesn't set a scope on keys from
+ * compaction WAL edits and if the scope is local.
+ * @param logKey Key that may get scoped according to its edits
+ * @param logEdit Edits used to lookup the scopes
+ * @throws IOException If failed to parse the WALEdit
+ */
+ @VisibleForTesting
+ static void scopeWALEdits(WALKey logKey, WALEdit logEdit, Configuration conf) throws IOException {
+ boolean replicationForBulkLoadEnabled =
+ ReplicationUtils.isReplicationForBulkLoadDataEnabled(conf);
+ boolean foundOtherEdits = false;
+ for (Cell cell : logEdit.getCells()) {
+ if (!CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) {
+ foundOtherEdits = true;
+ break;
+ }
+ }
+
+ if (!foundOtherEdits && logEdit.getCells().size() > 0) {
+ WALProtos.RegionEventDescriptor maybeEvent =
+ WALEdit.getRegionEventDescriptor(logEdit.getCells().get(0));
+ if (maybeEvent != null &&
+ (maybeEvent.getEventType() == WALProtos.RegionEventDescriptor.EventType.REGION_CLOSE)) {
+ // In serially replication, we use scopes when reading close marker.
+ foundOtherEdits = true;
+ }
+ }
+ if ((!replicationForBulkLoadEnabled && !foundOtherEdits) || logEdit.isReplay()) {
+ ((WALKeyImpl) logKey).serializeReplicationScope(false);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/14420e1b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index a8afe2d..a53cba3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -63,7 +63,6 @@ import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionInfoBuilder;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
@@ -82,7 +81,6 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALEdit;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALKey;
import org.apache.hadoop.hbase.wal.WALKeyImpl;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
@@ -276,23 +274,8 @@ public abstract class TestReplicationSourceManager {
WALFactory wals =
new WALFactory(utility.getConfiguration(), URLEncoder.encode("regionserver:60020", "UTF8"));
ReplicationSourceManager replicationManager = replication.getReplicationManager();
- wals.getWALProvider().addWALActionsListener(new WALActionsListener() {
-
- @Override
- public void preLogRoll(Path oldPath, Path newPath) throws IOException {
- replicationManager.preLogRoll(newPath);
- }
-
- @Override
- public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- replicationManager.postLogRoll(newPath);
- }
-
- @Override
- public void visitLogEntryBeforeWrite(WALKey logKey, WALEdit logEdit) throws IOException {
- replicationManager.scopeWALEdits(logKey, logEdit);
- }
- });
+ wals.getWALProvider()
+ .addWALActionsListener(new ReplicationSourceWALActionListener(conf, replicationManager));
final WAL wal = wals.getWAL(hri);
manager.init();
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf("tableame"));
@@ -450,7 +433,7 @@ public abstract class TestReplicationSourceManager {
RegionInfo hri = RegionInfoBuilder.newBuilder(tableName).setStartKey(HConstants.EMPTY_START_ROW)
.setEndKey(HConstants.EMPTY_END_ROW).build();
WALEdit edit = WALEdit.createCompaction(hri, compactionDescriptor);
- ReplicationSourceManager.scopeWALEdits(new WALKeyImpl(), edit, conf);
+ ReplicationSourceWALActionListener.scopeWALEdits(new WALKeyImpl(), edit, conf);
}
@Test
@@ -462,7 +445,7 @@ public abstract class TestReplicationSourceManager {
WALKeyImpl logKey = new WALKeyImpl(scope);
// 3. Get the scopes for the key
- ReplicationSourceManager.scopeWALEdits(logKey, logEdit, conf);
+ ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, conf);
// 4. Assert that no bulk load entry scopes are added if bulk load hfile replication is disabled
assertNull("No bulk load entries scope should be added if bulk load replication is disabled.",
@@ -481,7 +464,7 @@ public abstract class TestReplicationSourceManager {
bulkLoadConf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
// 4. Get the scopes for the key
- ReplicationSourceManager.scopeWALEdits(logKey, logEdit, bulkLoadConf);
+ ReplicationSourceWALActionListener.scopeWALEdits(logKey, logEdit, bulkLoadConf);
NavigableMap<byte[], Integer> scopes = logKey.getReplicationScopes();
// Assert family with replication scope global is present in the key scopes