You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2021/11/09 14:48:07 UTC
[hbase] 01/02: HBASE-26407 Introduce a region replication sink for
sinking WAL edits to secondary replicas directly (#3807)
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-26233
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit d54022929b0a6127caf8abafa3c60b4b2fd1c0f8
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Tue Nov 2 08:42:29 2021 +0800
HBASE-26407 Introduce a region replication sink for sinking WAL edits to secondary replicas directly (#3807)
Signed-off-by: Xiaolin Ha <ha...@apache.org>
---
.../hbase/client/AsyncClusterConnection.java | 13 +-
.../hbase/client/AsyncClusterConnectionImpl.java | 17 +-
.../AsyncRegionReplicaReplayRetryingCaller.java | 147 ------
.../AsyncRegionReplicationRetryingCaller.java | 103 +++++
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 11 +
.../hadoop/hbase/master/janitor/MetaFixer.java | 9 -
.../master/procedure/CreateTableProcedure.java | 11 -
.../master/procedure/ModifyTableProcedure.java | 9 -
.../apache/hadoop/hbase/regionserver/HRegion.java | 79 +++-
.../apache/hadoop/hbase/regionserver/HStore.java | 3 +-
.../MultiVersionConcurrencyControl.java | 24 +-
.../hbase/regionserver/RegionReplicationSink.java | 228 +++++++++
.../regionserver/handler/AssignRegionHandler.java | 12 +-
.../handler/UnassignRegionHandler.java | 12 +-
.../hbase/regionserver/wal/AbstractFSWAL.java | 3 +-
.../hadoop/hbase/regionserver/wal/WALUtil.java | 44 +-
.../regionserver/CatalogReplicationSource.java | 47 --
.../regionserver/CatalogReplicationSourcePeer.java | 50 --
.../RegionReplicaReplicationEndpoint.java | 407 ----------------
.../regionserver/ReplicationSourceFactory.java | 1 -
.../regionserver/ReplicationSourceManager.java | 87 ----
.../hadoop/hbase/util/ServerRegionReplicaUtil.java | 32 --
.../org/apache/hadoop/hbase/TestIOFencing.java | 4 +-
.../hbase/client/DummyAsyncClusterConnection.java | 13 +-
.../hadoop/hbase/regionserver/TestHRegion.java | 2 +-
.../regionserver/TestRegionReplicaFailover.java | 4 +-
....java => TestMetaRegionReplicaReplication.java} | 184 ++------
.../regionserver/TestRegionReplicaReplication.java | 273 +++++++++++
.../TestRegionReplicaReplicationEndpoint.java | 515 ---------------------
...stRegionReplicaReplicationEndpointNoMaster.java | 281 -----------
30 files changed, 790 insertions(+), 1835 deletions(-)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index 9502424..11c4f4f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -58,13 +58,6 @@ public interface AsyncClusterConnection extends AsyncConnection {
CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
/**
- * Replicate wal edits for replica regions. The return value is the edits we skipped, as the
- * original return value is useless.
- */
- CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
- List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
-
- /**
* Return all the replicas for a region. Used for region replica replication.
*/
CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
@@ -110,4 +103,10 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Get the bootstrap node list of another region server.
*/
CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer);
+
+ /**
+ * Replicate wal edits to a secondary replica.
+ */
+ CompletableFuture<Void> replicate(RegionInfo replica, List<Entry> entries, int numRetries,
+ long rpcTimeoutNs, long operationTimeoutNs);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
index 825fbb4..789d616 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -85,14 +85,6 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
}
@Override
- public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
- List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
- return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
- ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
- row, entries, replicaId).call();
- }
-
- @Override
public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload) {
return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
@@ -176,4 +168,13 @@ class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClu
});
return future;
}
+
+ @Override
+ public CompletableFuture<Void> replicate(RegionInfo replica,
+ List<Entry> entries, int retries, long rpcTimeoutNs,
+ long operationTimeoutNs) {
+ return new AsyncRegionReplicationRetryingCaller(RETRY_TIMER, this,
+ ConnectionUtils.retries2Attempts(retries), rpcTimeoutNs, operationTimeoutNs, replica, entries)
+ .call();
+ }
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
deleted file mode 100644
index 0146c8b..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
+++ /dev/null
@@ -1,147 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.client;
-
-import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
-
-import java.io.IOException;
-import java.util.List;
-import org.apache.hadoop.hbase.CellScanner;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
-
-/**
- * For replaying edits for region replica.
- * <p/>
- * The mainly difference here is that, every time after locating, we will check whether the region
- * name is equal, if not, we will give up, as this usually means the region has been split or
- * merged, and the new region(s) should already have all the data of the parent region(s).
- * <p/>
- * Notice that, the return value is the edits we skipped, as the original response message is not
- * used at upper layer.
- */
-@InterfaceAudience.Private
-public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
-
- private final TableName tableName;
-
- private final byte[] encodedRegionName;
-
- private final byte[] row;
-
- private final Entry[] entries;
-
- private final int replicaId;
-
- public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
- AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
- TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
- int replicaId) {
- super(retryTimer, conn, ConnectionUtils.getPriority(tableName), conn.connConf.getPauseNs(),
- conn.connConf.getPauseForCQTBENs(), maxAttempts, operationTimeoutNs,
- conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
- this.tableName = tableName;
- this.encodedRegionName = encodedRegionName;
- this.row = row;
- this.entries = entries.toArray(new Entry[0]);
- this.replicaId = replicaId;
- }
-
- private void call(HRegionLocation loc) {
- if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Skipping {} entries in table {} because located region {} is different than" +
- " the original region {} from WALEdit",
- entries.length, tableName, loc.getRegion().getEncodedName(),
- Bytes.toStringBinary(encodedRegionName));
- for (Entry entry : entries) {
- LOG.trace("Skipping : " + entry);
- }
- }
- future.complete(Long.valueOf(entries.length));
- return;
- }
-
- AdminService.Interface stub;
- try {
- stub = conn.getAdminStub(loc.getServerName());
- } catch (IOException e) {
- onError(e,
- () -> "Get async admin stub to " + loc.getServerName() + " for '" +
- Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
- tableName + " failed",
- err -> conn.getLocator().updateCachedLocationOnError(loc, err));
- return;
- }
- Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtobufUtil
- .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
- resetCallTimeout();
- controller.setCellScanner(p.getSecond());
- stub.replay(controller, p.getFirst(), r -> {
- if (controller.failed()) {
- onError(controller.getFailed(),
- () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
- loc.getRegion().getEncodedName() + " of " + tableName + " failed",
- err -> conn.getLocator().updateCachedLocationOnError(loc, err));
- } else {
- future.complete(0L);
- }
- });
-
- }
-
- @Override
- protected void doCall() {
- long locateTimeoutNs;
- if (operationTimeoutNs > 0) {
- locateTimeoutNs = remainingTimeNs();
- if (locateTimeoutNs <= 0) {
- completeExceptionally();
- return;
- }
- } else {
- locateTimeoutNs = -1L;
- }
- addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
- RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
- if (error != null) {
- onError(error,
- () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
- });
- return;
- }
- call(loc);
- });
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
new file mode 100644
index 0000000..a0ce418
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicationRetryingCaller.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtobufUtil;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+
+/**
+ * For replicating edits to secondary replicas.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionReplicationRetryingCaller extends AsyncRpcRetryingCaller<Void> {
+
+ private final RegionInfo replica;
+
+ private final Entry[] entries;
+
+ public AsyncRegionReplicationRetryingCaller(HashedWheelTimer retryTimer,
+ AsyncClusterConnectionImpl conn, int maxAttempts, long rpcTimeoutNs, long operationTimeoutNs,
+ RegionInfo replica, List<Entry> entries) {
+ super(retryTimer, conn, ConnectionUtils.getPriority(replica.getTable()),
+ conn.connConf.getPauseNs(), conn.connConf.getPauseForCQTBENs(), maxAttempts,
+ operationTimeoutNs, rpcTimeoutNs, conn.connConf.getStartLogErrorsCnt());
+ this.replica = replica;
+ this.entries = entries.toArray(new Entry[0]);
+ }
+
+ private void call(HRegionLocation loc) {
+ AdminService.Interface stub;
+ try {
+ stub = conn.getAdminStub(loc.getServerName());
+ } catch (IOException e) {
+ onError(e,
+ () -> "Get async admin stub to " + loc.getServerName() + " for " + replica + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+ return;
+ }
+ Pair<ReplicateWALEntryRequest, CellScanner> pair = ReplicationProtobufUtil
+ .buildReplicateWALEntryRequest(entries, replica.getEncodedNameAsBytes(), null, null, null);
+ resetCallTimeout();
+ controller.setCellScanner(pair.getSecond());
+ stub.replay(controller, pair.getFirst(), r -> {
+ if (controller.failed()) {
+ onError(controller.getFailed(),
+ () -> "Call to " + loc.getServerName() + " for " + replica + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+ } else {
+ future.complete(null);
+ }
+ });
+ }
+
+ @Override
+ protected void doCall() {
+ long locateTimeoutNs;
+ if (operationTimeoutNs > 0) {
+ locateTimeoutNs = remainingTimeNs();
+ if (locateTimeoutNs <= 0) {
+ completeExceptionally();
+ return;
+ }
+ } else {
+ locateTimeoutNs = -1L;
+ }
+ addListener(conn.getLocator().getRegionLocation(replica.getTable(), replica.getStartKey(),
+ replica.getReplicaId(), RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
+ if (error != null) {
+ onError(error, () -> "Locate " + replica + " failed", err -> {
+ });
+ return;
+ }
+ call(loc);
+ });
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index b41619a..9a7ba92 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -679,6 +679,17 @@ public abstract class RpcServer implements RpcServerInterface,
return Optional.ofNullable(CurCall.get());
}
+ /**
+ * Just return the current rpc call if it is a {@link ServerCall} and also has {@link CellScanner}
+ * attached.
+ * <p/>
+ * Mainly used for reference counting as {@link CellScanner} may reference non heap memory.
+ */
+ public static Optional<ServerCall<?>> getCurrentServerCallWithCellScanner() {
+ return getCurrentCall().filter(c -> c instanceof ServerCall)
+ .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall<?>) c);
+ }
+
public static boolean isInRpcCallContext() {
return CurCall.get() != null;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
index 4a5aa0a..64ee49e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/janitor/MetaFixer.java
@@ -29,7 +29,6 @@ import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.stream.Collectors;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.TableName;
@@ -40,10 +39,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.exceptions.MergeRegionException;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.assignment.TransitRegionStateProcedure;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -197,15 +194,9 @@ public class MetaFixer {
MetaTableAccessor.addRegionsToMeta(masterServices.getConnection(), newRegions,
td.getRegionReplication());
- // Setup replication for region replicas if needed
- if (td.getRegionReplication() > 1) {
- ServerRegionReplicaUtil.setupRegionReplicaReplication(masterServices);
- }
return Either.<List<RegionInfo>, IOException> ofLeft(newRegions);
} catch (IOException e) {
return Either.<List<RegionInfo>, IOException> ofRight(e);
- } catch (ReplicationException e) {
- return Either.<List<RegionInfo>, IOException> ofRight(new HBaseIOException(e));
}
})
.collect(Collectors.toList());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
index 2313e70..723f851 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/CreateTableProcedure.java
@@ -25,7 +25,6 @@ import java.util.function.Supplier;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.DoNotRetryIOException;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.TableExistsException;
@@ -37,12 +36,10 @@ import org.apache.hadoop.hbase.client.TableState;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.MasterFileSystem;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.ModifyRegionUtils;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -378,14 +375,6 @@ public class CreateTableProcedure
// Add regions to META
addRegionsToMeta(env, tableDescriptor, newRegions);
- // Setup replication for region replicas if needed
- if (tableDescriptor.getRegionReplication() > 1) {
- try {
- ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
- } catch (ReplicationException e) {
- throw new HBaseIOException(e);
- }
- }
return newRegions;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
index 247dd9c..aedb42c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/ModifyTableProcedure.java
@@ -38,10 +38,8 @@ import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
import org.apache.hadoop.hbase.master.zksyncer.MetaLocationSyncer;
import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer;
-import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.rsgroup.RSGroupInfo;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -414,13 +412,6 @@ public class ModifyTableProcedure
.collect(Collectors.toList());
addChildProcedure(env.getAssignmentManager().createAssignProcedures(newReplicas));
}
- if (oldReplicaCount <= 1) {
- try {
- ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterServices());
- } catch (ReplicationException e) {
- throw new HBaseIOException(e);
- }
- }
}
private void closeExcessReplicasIfNeeded(MasterProcedureEnv env) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 8afadc7..c9d456b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.regionserver;
import static org.apache.hadoop.hbase.HConstants.REPLICATION_SCOPE_LOCAL;
import static org.apache.hadoop.hbase.regionserver.HStoreFile.MAJOR_COMPACTION_KEY;
import static org.apache.hadoop.hbase.util.ConcurrentMapUtils.computeIfAbsent;
+
import edu.umd.cs.findbugs.annotations.Nullable;
import io.opentelemetry.api.trace.Span;
import java.io.EOFException;
@@ -135,6 +136,7 @@ import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.ipc.RpcCall;
import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.ipc.ServerCall;
import org.apache.hadoop.hbase.mob.MobFileCache;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@@ -177,6 +179,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Iterables;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
@@ -191,6 +194,7 @@ import org.apache.hbase.thirdparty.com.google.protobuf.Service;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
+
import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CoprocessorServiceCall;
@@ -708,6 +712,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
private final StoreHotnessProtector storeHotnessProtector;
+ private Optional<RegionReplicationSink> regionReplicationSink = Optional.empty();
+
/**
* HRegion constructor. This constructor should only be used for testing and
* extensions. Instances of HRegion should be instantiated with the
@@ -1080,11 +1086,28 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
status.setStatus("Running coprocessor post-open hooks");
coprocessorHost.postOpen();
}
-
+ initializeRegionReplicationSink(reporter, status);
status.markComplete("Region opened successfully");
return nextSeqId;
}
+ private void initializeRegionReplicationSink(CancelableProgressable reporter,
+ MonitoredTask status) {
+ RegionServerServices rss = getRegionServerServices();
+ TableDescriptor td = getTableDescriptor();
+ int regionReplication = td.getRegionReplication();
+ RegionInfo regionInfo = getRegionInfo();
+ if (regionReplication <= 1 || !RegionReplicaUtil.isDefaultReplica(regionInfo) ||
+ !ServerRegionReplicaUtil.isRegionReplicaReplicationEnabled(conf, regionInfo.getTable()) ||
+ rss == null) {
+ regionReplicationSink = Optional.empty();
+ return;
+ }
+ status.setStatus("Initializaing region replication sink");
+ regionReplicationSink = Optional.of(new RegionReplicationSink(conf, regionInfo,
+ regionReplication, td.hasRegionMemStoreReplication(), rss.getAsyncClusterConnection()));
+ }
+
/**
* Open all Stores.
* @param reporter
@@ -1207,7 +1230,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor.EventType.REGION_OPEN, getRegionInfo(), openSeqId,
getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionOpenDesc,
- mvcc);
+ mvcc, regionReplicationSink.orElse(null));
}
private void writeRegionCloseMarker(WAL wal) throws IOException {
@@ -1216,7 +1239,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionEventDescriptor.EventType.REGION_CLOSE, getRegionInfo(), mvcc.getReadPoint(),
getRegionServerServices().getServerName(), storeFiles);
WALUtil.writeRegionEventMarker(wal, getReplicationScope(), getRegionInfo(), regionEventDesc,
- mvcc);
+ mvcc, null);
// Store SeqId in WAL FileSystem when a region closes
// checking region folder exists is due to many tests which delete the table folder while a
@@ -1861,7 +1884,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
RegionReplicaUtil.isDefaultReplica(getRegionInfo())) {
writeRegionCloseMarker(wal);
}
-
+ if (regionReplicationSink.isPresent()) {
+ // stop replicating to secondary replicas
+ RegionReplicationSink sink = regionReplicationSink.get();
+ sink.stop();
+ try {
+ regionReplicationSink.get().waitUntilStopped();
+ } catch (InterruptedException e) {
+ throw throwOnInterrupt(e);
+ }
+ }
this.closed.set(true);
if (!canFlush) {
decrMemStoreSize(this.memStoreSizing.getMemStoreSize());
@@ -2822,7 +2854,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
getRegionInfo(), flushOpSeqId, committedFiles);
// No sync. Sync is below where no updates lock and we do FlushAction.COMMIT_FLUSH
WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
- mvcc);
+ mvcc, null);
}
// Prepare flush (take a snapshot)
@@ -2883,8 +2915,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false,
- mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, false, mvcc,
+ null);
} catch (Throwable t) {
LOG.warn("Received unexpected exception trying to write ABORT_FLUSH marker to WAL: {} in "
+ " region {}", StringUtils.stringifyException(t), this);
@@ -2928,8 +2960,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.CANNOT_FLUSH,
getRegionInfo(), -1, new TreeMap<>(Bytes.BYTES_COMPARATOR));
try {
- WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
- mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+ null);
return true;
} catch (IOException e) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
@@ -3008,8 +3040,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
// write flush marker to WAL. If fail, we should throw DroppedSnapshotException
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.COMMIT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true,
- mvcc);
+ WALUtil.writeFlushMarker(wal, this.getReplicationScope(), getRegionInfo(), desc, true, mvcc,
+ regionReplicationSink.orElse(null));
}
} catch (Throwable t) {
// An exception here means that the snapshot was not persisted.
@@ -3022,7 +3054,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
FlushDescriptor desc = ProtobufUtil.toFlushDescriptor(FlushAction.ABORT_FLUSH,
getRegionInfo(), flushOpSeqId, committedFiles);
- WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc);
+ WALUtil.writeFlushMarker(wal, this.replicationScope, getRegionInfo(), desc, false, mvcc,
+ null);
} catch (Throwable ex) {
LOG.warn(getRegionInfo().getEncodedName() + " : "
+ "failed writing ABORT_FLUSH marker to WAL", ex);
@@ -7067,10 +7100,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
try {
WALProtos.BulkLoadDescriptor loadDescriptor =
ProtobufUtil.toBulkLoadDescriptor(this.getRegionInfo().getTable(),
- UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
- storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
+ UnsafeByteOperations.unsafeWrap(this.getRegionInfo().getEncodedNameAsBytes()),
+ storeFiles, storeFilesSizes, seqId, clusterIds, replicate);
WALUtil.writeBulkLoadMarkerAndSync(this.wal, this.getReplicationScope(), getRegionInfo(),
- loadDescriptor, mvcc);
+ loadDescriptor, mvcc, regionReplicationSink.orElse(null));
} catch (IOException ioe) {
if (this.rsServices != null) {
// Have to abort region server because some hfiles has been loaded but we can't write
@@ -7757,21 +7790,25 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
if (this.coprocessorHost != null && !walEdit.isMetaEdit()) {
this.coprocessorHost.preWALAppend(walKey, walEdit);
}
- WriteEntry writeEntry = null;
+ ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
try {
long txid = this.wal.appendData(this.getRegionInfo(), walKey, walEdit);
+ WriteEntry writeEntry = walKey.getWriteEntry();
+ regionReplicationSink.ifPresent(sink -> writeEntry.attachCompletionAction(() -> {
+ sink.add(walKey, walEdit, rpcCall);
+ }));
// Call sync on our edit.
if (txid != 0) {
sync(txid, durability);
}
- writeEntry = walKey.getWriteEntry();
+ return writeEntry;
} catch (IOException ioe) {
- if (walKey != null && walKey.getWriteEntry() != null) {
+ if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
}
throw ioe;
}
- return writeEntry;
+
}
public static final long FIXED_OVERHEAD = ClassSize.estimateBase(HRegion.class, false);
@@ -8426,6 +8463,10 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi
}
}
+ public Optional<RegionReplicationSink> getRegionReplicationSink() {
+ return regionReplicationSink;
+ }
+
public void addReadRequestsCount(long readRequestsCount) {
this.readRequestsCount.add(readRequestsCount);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
index 47cb795..dc46f6b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HStore.java
@@ -1574,7 +1574,8 @@ public class HStore implements Store, HeapSize, StoreConfigInformation,
// Does this method belong in Region altogether given it is making so many references up there?
// Could be Region#writeCompactionMarker(compactionDescriptor);
WALUtil.writeCompactionMarker(this.region.getWAL(), this.region.getReplicationScope(),
- this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC());
+ this.region.getRegionInfo(), compactionDescriptor, this.region.getMVCC(),
+ region.getRegionReplicationSink().orElse(null));
}
void replaceStoreFiles(Collection<HStoreFile> compactedFiles, Collection<HStoreFile> result)
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
index d821eec..91ae03d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/MultiVersionConcurrencyControl.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hbase.regionserver;
import java.util.LinkedList;
+import java.util.Optional;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.ClassSize;
@@ -202,6 +203,7 @@ public class MultiVersionConcurrencyControl {
if (queueFirst.isCompleted()) {
nextReadValue = queueFirst.getWriteNumber();
writeQueue.removeFirst();
+ queueFirst.runCompletionAction();
} else {
break;
}
@@ -271,22 +273,36 @@ public class MultiVersionConcurrencyControl {
* Every created WriteEntry must be completed by calling mvcc#complete or #completeAndWait.
*/
@InterfaceAudience.Private
- public static class WriteEntry {
+ public static final class WriteEntry {
private final long writeNumber;
private boolean completed = false;
+ /**
+ * Will be called after completion, i.e, when being removed from the
+ * {@link MultiVersionConcurrencyControl#writeQueue}.
+ */
+ private Optional<Runnable> completionAction = Optional.empty();
- WriteEntry(long writeNumber) {
+ private WriteEntry(long writeNumber) {
this.writeNumber = writeNumber;
}
- void markCompleted() {
+ private void markCompleted() {
this.completed = true;
}
- boolean isCompleted() {
+ private boolean isCompleted() {
return this.completed;
}
+ public void attachCompletionAction(Runnable action) {
+ assert !completionAction.isPresent();
+ completionAction = Optional.of(action);
+ }
+
+ private void runCompletionAction() {
+ completionAction.ifPresent(Runnable::run);
+ }
+
public long getWriteNumber() {
return this.writeNumber;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
new file mode 100644
index 0000000..6911289
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionReplicationSink.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RegionReplicaUtil;
+import org.apache.hadoop.hbase.ipc.ServerCall;
+import org.apache.hadoop.hbase.util.FutureUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALEdit;
+import org.apache.hadoop.hbase.wal.WALKeyImpl;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
+
+/**
+ * The class for replicating WAL edits to secondary replicas, one instance per region.
+ */
+@InterfaceAudience.Private
+public class RegionReplicationSink {
+
+ private static final Logger LOG = LoggerFactory.getLogger(RegionReplicationSink.class);
+
+ public static final String MAX_PENDING_SIZE = "hbase.region.read-replica.sink.max-pending-size";
+
+ public static final long MAX_PENDING_SIZE_DEFAULT = 10L * 1024 * 1024;
+
+ public static final String RETRIES_NUMBER = "hbase.region.read-replica.sink.retries.number";
+
+ public static final int RETRIES_NUMBER_DEFAULT = 3;
+
+ public static final String RPC_TIMEOUT_MS = "hbase.region.read-replica.sink.rpc.timeout.ms";
+
+ public static final long RPC_TIMEOUT_MS_DEFAULT = 200;
+
+ public static final String OPERATION_TIMEOUT_MS =
+ "hbase.region.read-replica.sink.operation.timeout.ms";
+
+ public static final long OPERATION_TIMEOUT_MS_DEFAULT = 1000;
+
+ private static final class SinkEntry {
+
+ final WALKeyImpl key;
+
+ final WALEdit edit;
+
+ final ServerCall<?> rpcCall;
+
+ SinkEntry(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+ this.key = key;
+ this.edit = edit;
+ this.rpcCall = rpcCall;
+ if (rpcCall != null) {
+ // increase the reference count to avoid the rpc framework free the memory before we
+ // actually sending them out.
+ rpcCall.retainByWAL();
+ }
+ }
+
+ /**
+ * Should be called regardless of the result of the replicating operation. Unless you still want
+ * to reuse this entry, otherwise you must call this method to release the possible off heap
+ * memories.
+ */
+ void replicated() {
+ if (rpcCall != null) {
+ rpcCall.releaseByWAL();
+ }
+ }
+ }
+
+ private final RegionInfo primary;
+
+ private final int regionReplication;
+
+ private final boolean hasRegionMemStoreReplication;
+
+ private final Queue<SinkEntry> entries = new ArrayDeque<>();
+
+ private final AsyncClusterConnection conn;
+
+ private final int retries;
+
+ private final long rpcTimeoutNs;
+
+ private final long operationTimeoutNs;
+
+ private CompletableFuture<Void> future;
+
+ private boolean stopping;
+
+ private boolean stopped;
+
+ RegionReplicationSink(Configuration conf, RegionInfo primary, int regionReplication,
+ boolean hasRegionMemStoreReplication, AsyncClusterConnection conn) {
+ Preconditions.checkArgument(RegionReplicaUtil.isDefaultReplica(primary), "%s is not primary",
+ primary);
+ Preconditions.checkArgument(regionReplication > 1,
+ "region replication should be greater than 1 but got %s", regionReplication);
+ this.primary = primary;
+ this.regionReplication = regionReplication;
+ this.hasRegionMemStoreReplication = hasRegionMemStoreReplication;
+ this.conn = conn;
+ this.retries = conf.getInt(RETRIES_NUMBER, RETRIES_NUMBER_DEFAULT);
+ this.rpcTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getLong(RPC_TIMEOUT_MS, RPC_TIMEOUT_MS_DEFAULT));
+ this.operationTimeoutNs = TimeUnit.MILLISECONDS
+ .toNanos(conf.getLong(OPERATION_TIMEOUT_MS, OPERATION_TIMEOUT_MS_DEFAULT));
+ }
+
+ private void send() {
+ List<SinkEntry> toSend = new ArrayList<>();
+ for (SinkEntry entry;;) {
+ entry = entries.poll();
+ if (entry == null) {
+ break;
+ }
+ toSend.add(entry);
+ }
+ List<WAL.Entry> walEntries =
+ toSend.stream().map(e -> new WAL.Entry(e.key, e.edit)).collect(Collectors.toList());
+ List<CompletableFuture<Void>> futures = new ArrayList<>();
+ for (int replicaId = 1; replicaId < regionReplication; replicaId++) {
+ RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(primary, replicaId);
+ futures.add(conn.replicate(replica, walEntries, retries, rpcTimeoutNs, operationTimeoutNs));
+ }
+ future = CompletableFuture.allOf(futures.toArray(new CompletableFuture<?>[0]));
+ FutureUtils.addListener(future, (r, e) -> {
+ if (e != null) {
+ // TODO: drop pending edits and issue a flush
+ LOG.warn("Failed to replicate to secondary replicas for {}", primary, e);
+ }
+ toSend.forEach(SinkEntry::replicated);
+ synchronized (entries) {
+ future = null;
+ if (stopping) {
+ stopped = true;
+ entries.notifyAll();
+ return;
+ }
+ if (!entries.isEmpty()) {
+ send();
+ }
+ }
+ });
+ }
+
+ /**
+ * Add this edit to replication queue.
+ * <p/>
+ * The {@code rpcCall} is for retaining the cells if the edit is built within an rpc call and the
+ * rpc call has cell scanner, which is off heap.
+ */
+ public void add(WALKeyImpl key, WALEdit edit, ServerCall<?> rpcCall) {
+ if (!hasRegionMemStoreReplication && !edit.isMetaEdit()) {
+ // only replicate meta edit if region memstore replication is not enabled
+ return;
+ }
+ synchronized (entries) {
+ if (stopping) {
+ return;
+ }
+ // TODO: limit the total cached entries here, and we should have a global limitation, not for
+ // only this region.
+ entries.add(new SinkEntry(key, edit, rpcCall));
+ if (future == null) {
+ send();
+ }
+ }
+ }
+
+ /**
+ * Stop the replication sink.
+ * <p/>
+ * Usually this should only be called when you want to close a region.
+ */
+ void stop() {
+ synchronized (entries) {
+ stopping = true;
+ if (future == null) {
+ stopped = true;
+ entries.notifyAll();
+ }
+ }
+ }
+
+ /**
+ * Make sure that we have finished all the replicating requests.
+ * <p/>
+ * After returning, we can make sure there will be no new replicating requests to secondary
+ * replicas.
+ * <p/>
+ * This is used to keep the replicating order the same with the WAL edit order when writing.
+ */
+ void waitUntilStopped() throws InterruptedException {
+ synchronized (entries) {
+ while (!stopped) {
+ entries.wait();
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
index 5d9819c..101c9c3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/AssignRegionHandler.java
@@ -22,9 +22,7 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
@@ -34,10 +32,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.PostOpenDeployContext;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
@@ -134,14 +132,6 @@ public class AssignRegionHandler extends EventHandler {
// pass null for the last parameter, which used to be a CancelableProgressable, as now the
// opening can not be interrupted by a close request any more.
Configuration conf = rs.getConfiguration();
- TableName tn = htd.getTableName();
- if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(conf, tn)) {
- if (RegionReplicaUtil.isDefaultReplica(this.regionInfo.getReplicaId())) {
- // Add the hbase:meta replication source on replica zero/default.
- rs.getReplicationSourceService().getReplicationManager().
- addCatalogReplicationSource(this.regionInfo);
- }
- }
region = HRegion.openHRegion(regionInfo, htd, rs.getWAL(regionInfo), conf, rs, null);
} catch (IOException e) {
cleanUpAndReportFailure(e);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
index 0d02f30..2ac55ec 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/UnassignRegionHandler.java
@@ -22,7 +22,6 @@ import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.EventType;
import org.apache.hadoop.hbase.regionserver.HRegion;
@@ -31,10 +30,10 @@ import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionServerServices.RegionStateTransitionContext;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hadoop.hbase.shaded.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode;
/**
@@ -121,15 +120,6 @@ public class UnassignRegionHandler extends EventHandler {
}
rs.removeRegion(region, destination);
- if (ServerRegionReplicaUtil.isMetaRegionReplicaReplicationEnabled(rs.getConfiguration(),
- region.getTableDescriptor().getTableName())) {
- if (RegionReplicaUtil.isDefaultReplica(region.getRegionInfo().getReplicaId())) {
- // If hbase:meta read replicas enabled, remove replication source for hbase:meta Regions.
- // See assign region handler where we add the replication source on open.
- rs.getReplicationSourceService().getReplicationManager().
- removeCatalogReplicationSource(region.getRegionInfo());
- }
- }
if (!rs.reportRegionStateTransition(
new RegionStateTransitionContext(TransitionCode.CLOSED, HConstants.NO_SEQNUM, closeProcId,
-1, region.getRegionInfo()))) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
index 002d9b7..f0e4a1f 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/AbstractFSWAL.java
@@ -1164,8 +1164,7 @@ public abstract class AbstractFSWAL<W extends WriterBase> implements WAL {
txidHolder.setValue(ringBuffer.next());
});
long txid = txidHolder.longValue();
- ServerCall<?> rpcCall = RpcServer.getCurrentCall().filter(c -> c instanceof ServerCall)
- .filter(c -> c.getCellScanner() != null).map(c -> (ServerCall) c).orElse(null);
+ ServerCall<?> rpcCall = RpcServer.getCurrentServerCallWithCellScanner().orElse(null);
try {
FSWALEntry entry = new FSWALEntry(txid, key, edits, hri, inMemstore, rpcCall);
entry.stampRegionSequenceId(we);
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
index 23db3dc..15be95e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALUtil.java
@@ -28,7 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl.WriteEntry;
+import org.apache.hadoop.hbase.regionserver.RegionReplicationSink;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.wal.WAL;
@@ -71,9 +74,9 @@ public class WALUtil {
*/
public static WALKeyImpl writeCompactionMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, final CompactionDescriptor c,
- MultiVersionConcurrencyControl mvcc) throws IOException {
+ MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey =
- writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null);
+ writeMarker(wal, replicationScope, hri, WALEdit.createCompaction(hri, c), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended compaction marker " + TextFormat.shortDebugString(c));
}
@@ -86,10 +89,10 @@ public class WALUtil {
* This write is for internal use only. Not for external client consumption.
*/
public static WALKeyImpl writeFlushMarker(WAL wal, NavigableMap<byte[], Integer> replicationScope,
- RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc)
- throws IOException {
+ RegionInfo hri, final FlushDescriptor f, boolean sync, MultiVersionConcurrencyControl mvcc,
+ RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = doFullMarkerAppendTransaction(wal, replicationScope, hri,
- WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync);
+ WALEdit.createFlushWALEdit(hri, f), mvcc, null, sync, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended flush marker " + TextFormat.shortDebugString(f));
}
@@ -102,9 +105,9 @@ public class WALUtil {
*/
public static WALKeyImpl writeRegionEventMarker(WAL wal,
NavigableMap<byte[], Integer> replicationScope, RegionInfo hri, RegionEventDescriptor r,
- MultiVersionConcurrencyControl mvcc) throws IOException {
- WALKeyImpl walKey =
- writeMarker(wal, replicationScope, hri, WALEdit.createRegionEventWALEdit(hri, r), mvcc, null);
+ MultiVersionConcurrencyControl mvcc, RegionReplicationSink sink) throws IOException {
+ WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
+ WALEdit.createRegionEventWALEdit(hri, r), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended region event marker " + TextFormat.shortDebugString(r));
}
@@ -122,11 +125,11 @@ public class WALUtil {
* @throws IOException We will throw an IOException if we can not append to the HLog.
*/
public static WALKeyImpl writeBulkLoadMarkerAndSync(final WAL wal,
- final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
- final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc)
- throws IOException {
+ final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri,
+ final WALProtos.BulkLoadDescriptor desc, final MultiVersionConcurrencyControl mvcc,
+ final RegionReplicationSink sink) throws IOException {
WALKeyImpl walKey = writeMarker(wal, replicationScope, hri,
- WALEdit.createBulkLoadEvent(hri, desc), mvcc, null);
+ WALEdit.createBulkLoadEvent(hri, desc), mvcc, null, sink);
if (LOG.isTraceEnabled()) {
LOG.trace("Appended Bulk Load marker " + TextFormat.shortDebugString(desc));
}
@@ -135,11 +138,11 @@ public class WALUtil {
private static WALKeyImpl writeMarker(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
- final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes)
- throws IOException {
+ final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+ final RegionReplicationSink sink) throws IOException {
// If sync == true in below, then timeout is not used; safe to pass UNSPECIFIED_TIMEOUT
return doFullMarkerAppendTransaction(wal, replicationScope, hri, edit, mvcc, extendedAttributes,
- true);
+ true, sink);
}
/**
@@ -152,19 +155,24 @@ public class WALUtil {
*/
private static WALKeyImpl doFullMarkerAppendTransaction(final WAL wal,
final NavigableMap<byte[], Integer> replicationScope, final RegionInfo hri, final WALEdit edit,
- final MultiVersionConcurrencyControl mvcc,
- final Map<String, byte[]> extendedAttributes, final boolean sync) throws IOException {
+ final MultiVersionConcurrencyControl mvcc, final Map<String, byte[]> extendedAttributes,
+ final boolean sync, final RegionReplicationSink sink) throws IOException {
// TODO: Pass in current time to use?
WALKeyImpl walKey = new WALKeyImpl(hri.getEncodedNameAsBytes(), hri.getTable(),
EnvironmentEdgeManager.currentTime(), mvcc, replicationScope, extendedAttributes);
long trx = MultiVersionConcurrencyControl.NONE;
try {
trx = wal.appendMarker(hri, walKey, edit);
+ WriteEntry writeEntry = walKey.getWriteEntry();
+ if (sink != null) {
+ writeEntry.attachCompletionAction(() -> sink.add(walKey, edit,
+ RpcServer.getCurrentServerCallWithCellScanner().orElse(null)));
+ }
if (sync) {
wal.sync(trx);
}
// Call complete only here because these are markers only. They are not for clients to read.
- mvcc.complete(walKey.getWriteEntry());
+ mvcc.complete(writeEntry);
} catch (IOException ioe) {
if (walKey.getWriteEntry() != null) {
mvcc.complete(walKey.getWriteEntry());
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
deleted file mode 100644
index 8cb7860..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSource.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.util.Collections;
-import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * ReplicationSource that reads catalog WAL files -- e.g. hbase:meta WAL files -- and lets through
- * all WALEdits from these WALs. This ReplicationSource is NOT created via
- * {@link ReplicationSourceFactory}.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSource extends ReplicationSource {
- CatalogReplicationSource() {
- // Filters in hbase:meta WAL files and allows all edits, including 'meta' edits (these are
- // filtered out in the 'super' class default implementation).
- super(p -> AbstractFSWALProvider.isMetaFile(p), Collections.emptyList());
- }
-
- @Override
- public void logPositionAndCleanOldLogs(WALEntryBatch entryBatch) {
- // Noop. This CatalogReplicationSource implementation does not persist state to backing storage
- // nor does it keep its WALs in a general map up in ReplicationSourceManager --
- // CatalogReplicationSource is used by the Catalog Read Replica feature which resets everytime
- // the WAL source process crashes. Skip calling through to the default implementation.
- // See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
- // design doc attached to HBASE-18070 'Enable memstore replication for meta replica for detail'
- // for background on why no need to keep WAL state.
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
deleted file mode 100644
index 3bcd414..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/CatalogReplicationSourcePeer.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
-import org.apache.hadoop.hbase.replication.SyncReplicationState;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * The 'peer' used internally by Catalog Region Replicas Replication Source.
- * The Replication system has 'peer' baked into its core so though we do not need 'peering', we
- * need a 'peer' and its configuration else the replication system breaks at a few locales.
- * Set "hbase.region.replica.catalog.replication" if you want to change the configured endpoint.
- */
-@InterfaceAudience.Private
-class CatalogReplicationSourcePeer extends ReplicationPeerImpl {
- /**
- * @param clusterKey Usually the UUID from zk passed in by caller as a String.
- */
- CatalogReplicationSourcePeer(Configuration configuration, String clusterKey) {
- super(configuration, ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER + "_catalog",
- ReplicationPeerConfig.newBuilder().
- setClusterKey(clusterKey).
- setReplicationEndpointImpl(
- configuration.get("hbase.region.replica.catalog.replication",
- RegionReplicaReplicationEndpoint.class.getName())).
- setBandwidth(0). // '0' means no bandwidth.
- setSerial(false).
- build(),
- true, SyncReplicationState.NONE, SyncReplicationState.NONE);
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
deleted file mode 100644
index 17e7a53..0000000
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,407 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.TreeMap;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseIOException;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.TableDescriptors;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.WALEntryFilter;
-import org.apache.hadoop.hbase.util.AtomicUtils;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FutureUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.RetryCounter;
-import org.apache.hadoop.hbase.util.RetryCounterFactory;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
-import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
-
-/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
- * edits from the WAL, and sends the edits to replicas of regions.
- */
-@InterfaceAudience.Private
-public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
-
- private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
-
- // Can be configured differently than hbase.client.retries.number
- private static String CLIENT_RETRIES_NUMBER =
- "hbase.region.replica.replication.client.retries.number";
-
- private Configuration conf;
- private AsyncClusterConnection connection;
- private TableDescriptors tableDescriptors;
-
- private int numRetries;
-
- private long operationTimeoutNs;
-
- private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
-
- private Cache<TableName, TableName> disabledTableCache;
-
- private final RetryCounterFactory retryCounterFactory =
- new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
-
- @Override
- public void init(Context context) throws IOException {
- super.init(context);
- this.conf = context.getConfiguration();
- this.tableDescriptors = context.getTableDescriptors();
- int memstoreReplicationEnabledCacheExpiryMs = conf
- .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
- // A cache for the table "memstore replication enabled" flag.
- // It has a default expiry of 5 sec. This means that if the table is altered
- // with a different flag value, we might miss to replicate for that amount of
- // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
- tableDescriptorCache = CacheBuilder.newBuilder()
- .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
- .initialCapacity(10).maximumSize(1000)
- .build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
-
- @Override
- public Optional<TableDescriptor> load(TableName tableName) throws Exception {
- // check if the table requires memstore replication
- // some unit-test drop the table, so we should do a bypass check and always replicate.
- return Optional.ofNullable(tableDescriptors.get(tableName));
- }
- });
- int nonExistentTableCacheExpiryMs =
- conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
- // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
- // table is created again with the same name, we might miss to replicate for that amount of
- // time. But this cache prevents overloading meta requests for every edit from a deleted file.
- disabledTableCache = CacheBuilder.newBuilder()
- .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
- .maximumSize(1000).build();
- // HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
- // We are resetting it here because we want default number of retries (35) rather than 10 times
- // that which makes very long retries for disabled tables etc.
- int defaultNumRetries = conf.getInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER,
- HConstants.DEFAULT_HBASE_CLIENT_RETRIES_NUMBER);
- if (defaultNumRetries > 10) {
- int mult = conf.getInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER,
- HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
- defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
- }
- this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
- // use the regular RPC timeout for replica replication RPC's
- this.operationTimeoutNs =
- TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
- this.connection = context.getServer().getAsyncClusterConnection();
- }
-
- /**
- * returns true if the specified entry must be replicated. We should always replicate meta
- * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
- * memstore.
- */
- private boolean requiresReplication(Optional<TableDescriptor> tableDesc,
- Entry entry) {
- // empty edit does not need to be replicated
- if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
- return false;
- }
- // meta edits (e.g. flush) must be always replicated
- return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
- }
-
- private void getRegionLocations(CompletableFuture<RegionLocations> future,
- TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
- FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
- (locs, e) -> {
- if (e != null) {
- future.completeExceptionally(e);
- return;
- }
- // if we are not loading from cache, just return
- if (reload) {
- future.complete(locs);
- return;
- }
- // check if the number of region replicas is correct, and also the primary region name
- // matches.
- if (locs.size() == tableDesc.getRegionReplication() &&
- locs.getDefaultRegionLocation() != null &&
- Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
- encodedRegionName)) {
- future.complete(locs);
- } else {
- // reload again as the information in cache maybe stale
- getRegionLocations(future, tableDesc, encodedRegionName, row, true);
- }
- });
- }
-
- private void replicate(CompletableFuture<Long> future, RegionLocations locs,
- TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
- if (locs.size() == 1) {
- LOG.info("Only one location for {}.{}, refresh the location cache only for meta now",
- tableDesc.getTableName(), Bytes.toString(encodedRegionName));
-
- // This could happen to meta table. In case of meta table comes with no replica and
- // later it is changed to multiple replicas. The cached location for meta may only has
- // the primary region. In this case, it needs to clean up and refresh the cached meta
- // locations.
- if (tableDesc.isMetaTable()) {
- connection.getRegionLocator(tableDesc.getTableName()).clearRegionLocationCache();
- }
- future.complete(Long.valueOf(entries.size()));
- return;
- }
- RegionInfo defaultReplica = locs.getDefaultRegionLocation().getRegion();
- if (!Bytes.equals(defaultReplica.getEncodedNameAsBytes(), encodedRegionName)) {
- // the region name is not equal, this usually means the region has been split or merged, so
- // give up replicating as the new region(s) should already have all the data of the parent
- // region(s).
- if (LOG.isTraceEnabled()) {
- LOG.trace(
- "Skipping {} entries in table {} because located region {} is different than" +
- " the original region {} from WALEdit",
- tableDesc.getTableName(), defaultReplica.getEncodedName(),
- Bytes.toStringBinary(encodedRegionName));
- }
- future.complete(Long.valueOf(entries.size()));
- return;
- }
- AtomicReference<Throwable> error = new AtomicReference<>();
- AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
- AtomicLong skippedEdits = new AtomicLong(0);
-
- for (int i = 1, n = locs.size(); i < n; i++) {
- // Do not use the elements other than the default replica as they may be null. We will fail
- // earlier if the location for default replica is null.
- final RegionInfo replica = RegionReplicaUtil.getRegionInfoForReplica(defaultReplica, i);
- FutureUtils
- .addListener(connection.replay(tableDesc.getTableName(), replica.getEncodedNameAsBytes(),
- row, entries, replica.getReplicaId(), numRetries, operationTimeoutNs), (r, e) -> {
- if (e != null) {
- LOG.warn("Failed to replicate to {}", replica, e);
- error.compareAndSet(null, e);
- } else {
- AtomicUtils.updateMax(skippedEdits, r.longValue());
- }
- if (remainingTasks.decrementAndGet() == 0) {
- if (error.get() != null) {
- future.completeExceptionally(error.get());
- } else {
- future.complete(skippedEdits.get());
- }
- }
- });
- }
- }
-
- private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
- for (Entry entry : entries) {
- LOG.trace("Skipping : {}", entry);
- }
- }
- }
-
- private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
- List<Entry> entries) {
- if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
- logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
- return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
- }
- byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
- CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
- getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
- CompletableFuture<Long> future = new CompletableFuture<>();
- FutureUtils.addListener(locateFuture, (locs, error) -> {
- if (error != null) {
- future.completeExceptionally(error);
- } else if (locs.getDefaultRegionLocation() == null) {
- future.completeExceptionally(
- new HBaseIOException("No location found for default replica of table=" +
- tableDesc.getTableName() + " row='" + Bytes.toStringBinary(row) + "'"));
- } else {
- replicate(future, locs, tableDesc, encodedRegionName, row, entries);
- }
- });
- return future;
- }
-
- @Override
- public boolean replicate(ReplicateContext replicateContext) {
- Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
- new TreeMap<>(Bytes.BYTES_COMPARATOR);
- long skippedEdits = 0;
- RetryCounter retryCounter = retryCounterFactory.create();
- outer: while (isRunning()) {
- encodedRegionName2Entries.clear();
- skippedEdits = 0;
- for (Entry entry : replicateContext.getEntries()) {
- Optional<TableDescriptor> tableDesc;
- try {
- tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
- } catch (ExecutionException e) {
- LOG.warn("Failed to load table descriptor for {}, attempts={}",
- entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
- if (!retryCounter.shouldRetry()) {
- return false;
- }
- try {
- retryCounter.sleepUntilNextRetry();
- } catch (InterruptedException e1) {
- // restore the interrupted state
- Thread.currentThread().interrupt();
- return false;
- }
- continue outer;
- }
- if (!requiresReplication(tableDesc, entry)) {
- skippedEdits++;
- continue;
- }
- byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
- encodedRegionName2Entries
- .computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
- .getSecond().add(entry);
- }
- break;
- }
- // send the request to regions
- retryCounter = retryCounterFactory.create();
- while (isRunning()) {
- List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
- new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
- for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
- .entrySet()) {
- CompletableFuture<Long> future =
- replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
- futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
- }
- for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
- byte[] encodedRegionName = pair.getSecond();
- try {
- skippedEdits += pair.getFirst().get().longValue();
- encodedRegionName2Entries.remove(encodedRegionName);
- } catch (InterruptedException e) {
- // restore the interrupted state
- Thread.currentThread().interrupt();
- return false;
- } catch (ExecutionException e) {
- Pair<TableDescriptor, List<Entry>> tableAndEntries =
- encodedRegionName2Entries.get(encodedRegionName);
- TableName tableName = tableAndEntries.getFirst().getTableName();
- List<Entry> entries = tableAndEntries.getSecond();
- Throwable cause = e.getCause();
- // The table can be disabled or dropped at this time. For disabled tables, we have no
- // cheap mechanism to detect this case because meta does not contain this information.
- // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
- // RPC. So instead we start the replay RPC with retries and check whether the table is
- // dropped or disabled which might cause SocketTimeoutException, or
- // RetriesExhaustedException or similar if we get IOE.
- if (cause instanceof TableNotFoundException) {
- // add to cache that the table does not exist
- tableDescriptorCache.put(tableName, Optional.empty());
- logSkipped(tableName, entries, "dropped");
- skippedEdits += entries.size();
- encodedRegionName2Entries.remove(encodedRegionName);
- continue;
- }
- boolean disabled = false;
- try {
- disabled = connection.getAdmin().isTableDisabled(tableName).get();
- } catch (InterruptedException e1) {
- // restore the interrupted state
- Thread.currentThread().interrupt();
- return false;
- } catch (ExecutionException e1) {
- LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
- e1.getCause());
- }
- if (disabled) {
- disabledTableCache.put(tableName, tableName);
- logSkipped(tableName, entries, "disabled");
- skippedEdits += entries.size();
- encodedRegionName2Entries.remove(encodedRegionName);
- continue;
- }
- LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
- Bytes.toStringBinary(encodedRegionName), tableName);
- }
- }
- // we have done
- if (encodedRegionName2Entries.isEmpty()) {
- ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
- return true;
- } else {
- LOG.warn("Failed to replicate all entries, retry={}", retryCounter.getAttemptTimes());
- if (!retryCounter.shouldRetry()) {
- return false;
- }
- try {
- retryCounter.sleepUntilNextRetry();
- } catch (InterruptedException e) {
- // restore the interrupted state
- Thread.currentThread().interrupt();
- return false;
- }
- }
- }
-
- return false;
- }
-
- @Override
- public boolean canReplicateToSameCluster() {
- return true;
- }
-
- @Override
- protected WALEntryFilter getScopeWALEntryFilter() {
- // we do not care about scope. We replicate everything.
- return null;
- }
-}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
index 8863f14..b055902 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceFactory.java
@@ -27,7 +27,6 @@ import org.slf4j.LoggerFactory;
/**
* Constructs a {@link ReplicationSourceInterface}
* Note, not used to create specialized ReplicationSources
- * @see CatalogReplicationSource
*/
@InterfaceAudience.Private
public final class ReplicationSourceFactory {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 73efcfe..9f8d8dc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -39,7 +39,6 @@ import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -49,8 +48,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
-import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -64,9 +61,7 @@ import org.apache.hadoop.hbase.replication.SyncReplicationState;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.AbstractFSWALProvider;
import org.apache.hadoop.hbase.wal.SyncReplicationWALProvider;
-import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
-import org.apache.hadoop.hbase.wal.WALProvider;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
@@ -183,16 +178,6 @@ public class ReplicationSourceManager {
private final MetricsReplicationGlobalSourceSource globalMetrics;
/**
- * A special ReplicationSource for hbase:meta Region Read Replicas.
- * Usually this reference remains empty. If an hbase:meta Region is opened on this server, we
- * will create an instance of a hbase:meta CatalogReplicationSource and it will live the life of
- * the Server thereafter; i.e. we will not shut it down even if the hbase:meta moves away from
- * this server (in case it later gets moved back). We synchronize on this instance testing for
- * presence and if absent, while creating so only created and started once.
- */
- AtomicReference<ReplicationSourceInterface> catalogReplicationSource = new AtomicReference<>();
-
- /**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param queueStorage the interface for manipulating replication queues
* @param conf the configuration to use
@@ -1066,78 +1051,6 @@ public class ReplicationSourceManager {
return this.globalMetrics;
}
- /**
- * Add an hbase:meta Catalog replication source. Called on open of an hbase:meta Region.
- * Create it once only. If exists already, use the existing one.
- * @see #removeCatalogReplicationSource(RegionInfo)
- * @see #addSource(String) This is specialization on the addSource method.
- */
- public ReplicationSourceInterface addCatalogReplicationSource(RegionInfo regionInfo)
- throws IOException {
- // Poor-man's putIfAbsent
- synchronized (this.catalogReplicationSource) {
- ReplicationSourceInterface rs = this.catalogReplicationSource.get();
- return rs != null ? rs :
- this.catalogReplicationSource.getAndSet(createCatalogReplicationSource(regionInfo));
- }
- }
-
- /**
- * Remove the hbase:meta Catalog replication source.
- * Called when we close hbase:meta.
- * @see #addCatalogReplicationSource(RegionInfo regionInfo)
- */
- public void removeCatalogReplicationSource(RegionInfo regionInfo) {
- // Nothing to do. Leave any CatalogReplicationSource in place in case an hbase:meta Region
- // comes back to this server.
- }
-
- /**
- * Create, initialize, and start the Catalog ReplicationSource.
- * Presumes called one-time only (caller must ensure one-time only call).
- * This ReplicationSource is NOT created via {@link ReplicationSourceFactory}.
- * @see #addSource(String) This is a specialization of the addSource call.
- * @see #catalogReplicationSource for a note on this ReplicationSource's lifecycle (and more on
- * why the special handling).
- */
- private ReplicationSourceInterface createCatalogReplicationSource(RegionInfo regionInfo)
- throws IOException {
- // Instantiate meta walProvider. Instantiated here or over in the #warmupRegion call made by the
- // Master on a 'move' operation. Need to do extra work if we did NOT instantiate the provider.
- WALProvider walProvider = this.walFactory.getMetaWALProvider();
- boolean instantiate = walProvider == null;
- if (instantiate) {
- walProvider = this.walFactory.getMetaProvider();
- }
- // Here we do a specialization on what {@link ReplicationSourceFactory} does. There is no need
- // for persisting offset into WALs up in zookeeper (via ReplicationQueueInfo) as the catalog
- // read replicas feature that makes use of the source does a reset on a crash of the WAL
- // source process. See "4.1 Skip maintaining zookeeper replication queue (offsets/WALs)" in the
- // design doc attached to HBASE-18070 'Enable memstore replication for meta replica' for detail.
- CatalogReplicationSourcePeer peer = new CatalogReplicationSourcePeer(this.conf,
- this.clusterId.toString());
- final ReplicationSourceInterface crs = new CatalogReplicationSource();
- crs.init(conf, fs, this, new NoopReplicationQueueStorage(), peer, server, peer.getId(),
- clusterId, walProvider.getWALFileLengthProvider(), new MetricsSource(peer.getId()));
- // Add listener on the provider so we can pick up the WAL to replicate on roll.
- WALActionsListener listener = new WALActionsListener() {
- @Override public void postLogRoll(Path oldPath, Path newPath) throws IOException {
- crs.enqueueLog(newPath);
- }
- };
- walProvider.addWALActionsListener(listener);
- if (!instantiate) {
- // If we did not instantiate provider, need to add our listener on already-created WAL
- // instance too (listeners are passed by provider to WAL instance on creation but if provider
- // created already, our listener add above is missed). And add the current WAL file to the
- // Replication Source so it can start replicating it.
- WAL wal = walProvider.getWAL(regionInfo);
- wal.registerWALActionsListener(listener);
- crs.enqueueLog(((AbstractFSWAL)wal).getCurrentFileName());
- }
- return crs.startup();
- }
-
ReplicationQueueStorage getQueueStorage() {
return queueStorage;
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
index 5583a47..6a46bd4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/ServerRegionReplicaUtil.java
@@ -22,23 +22,15 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionReplicaUtil;
import org.apache.hadoop.hbase.io.HFileLink;
import org.apache.hadoop.hbase.io.Reference;
-import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.StoreFileInfo;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
/**
* Similar to {@link RegionReplicaUtil} but for the server side
@@ -46,8 +38,6 @@ import org.slf4j.LoggerFactory;
@InterfaceAudience.Private
public class ServerRegionReplicaUtil extends RegionReplicaUtil {
- private static final Logger LOG = LoggerFactory.getLogger(ServerRegionReplicaUtil.class);
-
/**
* Whether asynchronous WAL replication to the secondary region replicas is enabled or not.
* If this is enabled, a replication peer named "region_replica_replication" will be created
@@ -59,7 +49,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
public static final String REGION_REPLICA_REPLICATION_CONF_KEY
= "hbase.region.replica.replication.enabled";
private static final boolean DEFAULT_REGION_REPLICA_REPLICATION = false;
- public static final String REGION_REPLICA_REPLICATION_PEER = "region_replica_replication";
/**
* Same as for {@link #REGION_REPLICA_REPLICATION_CONF_KEY} but for catalog replication.
@@ -162,27 +151,6 @@ public class ServerRegionReplicaUtil extends RegionReplicaUtil {
}
/**
- * Create replication peer for replicating user-space Region Read Replicas.
- * This methods should only be called at master side.
- */
- public static void setupRegionReplicaReplication(MasterServices services)
- throws IOException, ReplicationException {
- if (!isRegionReplicaReplicationEnabled(services.getConfiguration())) {
- return;
- }
- if (services.getReplicationPeerManager().getPeerConfig(REGION_REPLICA_REPLICATION_PEER)
- .isPresent()) {
- return;
- }
- LOG.info("Region replica replication peer id=" + REGION_REPLICA_REPLICATION_PEER +
- " not exist. Creating...");
- ReplicationPeerConfig peerConfig = ReplicationPeerConfig.newBuilder()
- .setClusterKey(ZKConfig.getZooKeeperClusterKey(services.getConfiguration()))
- .setReplicationEndpointImpl(RegionReplicaReplicationEndpoint.class.getName()).build();
- services.addReplicationPeer(REGION_REPLICA_REPLICATION_PEER, peerConfig, true);
- }
-
- /**
* @return True if Region Read Replica is enabled for <code>tn</code> (whether hbase:meta or
* user-space tables).
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
index 3c2bc3f..fe16c31 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/TestIOFencing.java
@@ -296,8 +296,8 @@ public class TestIOFencing {
FAMILY, Lists.newArrayList(new Path("/a")), Lists.newArrayList(new Path("/b")),
new Path("store_dir"));
WALUtil.writeCompactionMarker(compactingRegion.getWAL(),
- ((HRegion)compactingRegion).getReplicationScope(),
- oldHri, compactionDescriptor, compactingRegion.getMVCC());
+ ((HRegion) compactingRegion).getReplicationScope(), oldHri, compactionDescriptor,
+ compactingRegion.getMVCC(), null);
// Wait till flush has happened, otherwise there won't be multiple store files
long startWaitTime = EnvironmentEdgeManager.currentTime();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
index 4b10110..ef3511c 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/DummyAsyncClusterConnection.java
@@ -126,12 +126,6 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
}
@Override
- public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
- List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs) {
- return null;
- }
-
- @Override
public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
boolean reload) {
return null;
@@ -169,4 +163,11 @@ public class DummyAsyncClusterConnection implements AsyncClusterConnection {
public CompletableFuture<List<ServerName>> getAllBootstrapNodes(ServerName regionServer) {
return null;
}
+
+ @Override
+ public CompletableFuture<Void> replicate(RegionInfo replica,
+ List<Entry> entries, int numRetries, long rpcTimeoutNs,
+ long operationTimeoutNs) {
+ return null;
+ }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
index a5584ff..187dca5 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java
@@ -996,7 +996,7 @@ public class TestHRegion {
region.getRegionFileSystem().getStoreDir(Bytes.toString(family)));
WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(),
- this.region.getRegionInfo(), compactionDescriptor, region.getMVCC());
+ this.region.getRegionInfo(), compactionDescriptor, region.getMVCC(), null);
Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
index ea6589d..ba7e9d1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionReplicaFailover.java
@@ -38,7 +38,7 @@ import org.apache.hadoop.hbase.client.Consistency;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplicationEndpoint;
+import org.apache.hadoop.hbase.replication.regionserver.TestRegionReplicaReplication;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
@@ -64,7 +64,7 @@ public class TestRegionReplicaFailover {
HBaseClassTestRule.forClass(TestRegionReplicaFailover.class);
private static final Logger LOG =
- LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
+ LoggerFactory.getLogger(TestRegionReplicaReplication.class);
private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
similarity index 74%
rename from hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
rename to hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
index 5a06a11..b501ab2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestMetaRegionReplicaReplication.java
@@ -20,14 +20,13 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.apache.hadoop.hbase.client.RegionLocator.LOCATOR_META_REPLICAS_MODE;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
+
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
-import java.util.Objects;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellScanner;
@@ -36,7 +35,6 @@ import org.apache.hadoop.hbase.ClientMetaTableAccessor;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HBaseTestingUtil;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.MetaTableAccessor;
import org.apache.hadoop.hbase.SingleProcessHBaseCluster;
import org.apache.hadoop.hbase.TableName;
@@ -45,7 +43,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
@@ -67,18 +64,17 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Tests RegionReplicaReplicationEndpoint class for hbase:meta by setting up region replicas and
- * verifying async wal replication replays the edits to the secondary region in various scenarios.
- *
- * @see TestRegionReplicaReplicationEndpoint
+ * Tests region replication for hbase:meta by setting up region replicas and verifying async wal
+ * replication replays the edits to the secondary region in various scenarios.
+ * @see TestRegionReplicaReplication
*/
-@Category({LargeTests.class})
-public class TestMetaRegionReplicaReplicationEndpoint {
+@Category({ LargeTests.class })
+public class TestMetaRegionReplicaReplication {
+
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestMetaRegionReplicaReplicationEndpoint.class);
- private static final Logger LOG =
- LoggerFactory.getLogger(TestMetaRegionReplicaReplicationEndpoint.class);
+ HBaseClassTestRule.forClass(TestMetaRegionReplicaReplication.class);
+ private static final Logger LOG = LoggerFactory.getLogger(TestMetaRegionReplicaReplication.class);
private static final int NB_SERVERS = 4;
private final HBaseTestingUtil HTU = new HBaseTestingUtil();
private int numOfMetaReplica = NB_SERVERS - 1;
@@ -102,17 +98,15 @@ public class TestMetaRegionReplicaReplicationEndpoint {
conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
// Enable hbase:meta replication.
- conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY,
- true);
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CATALOG_CONF_KEY, true);
// Set hbase:meta replicas to be 3.
// conf.setInt(HConstants.META_REPLICAS_NUM, numOfMetaReplica);
HTU.startMiniCluster(NB_SERVERS);
// Enable hbase:meta replication.
HBaseTestingUtil.setReplicas(HTU.getAdmin(), TableName.META_TABLE_NAME, numOfMetaReplica);
- HTU.waitFor(30000,
- () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME).size()
- >= numOfMetaReplica);
+ HTU.waitFor(30000, () -> HTU.getMiniHBaseCluster().getRegions(TableName.META_TABLE_NAME)
+ .size() >= numOfMetaReplica);
}
@After
@@ -121,83 +115,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
/**
- * Assert that the ReplicationSource for hbase:meta gets created when hbase:meta is opened.
- */
- @Test
- public void testHBaseMetaReplicationSourceCreatedOnOpen() throws Exception {
- SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
- HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
- // Replicate a row to prove all working.
- testHBaseMetaReplicatesOneRow(0);
- assertTrue(isMetaRegionReplicaReplicationSource(hrs));
- // Now move the hbase:meta and make sure the ReplicationSource is in both places.
- HRegionServer hrsOther = null;
- for (int i = 0; i < cluster.getNumLiveRegionServers(); i++) {
- hrsOther = cluster.getRegionServer(i);
- if (hrsOther.getServerName().equals(hrs.getServerName())) {
- hrsOther = null;
- continue;
- }
- break;
- }
- assertNotNull(hrsOther);
- assertFalse(isMetaRegionReplicaReplicationSource(hrsOther));
- Region meta = null;
- for (Region region : hrs.getOnlineRegionsLocalContext()) {
- if (region.getRegionInfo().isMetaRegion()) {
- meta = region;
- break;
- }
- }
- assertNotNull(meta);
- HTU.moveRegionAndWait(meta.getRegionInfo(), hrsOther.getServerName());
- // Assert that there is a ReplicationSource in both places now.
- assertTrue(isMetaRegionReplicaReplicationSource(hrs));
- assertTrue(isMetaRegionReplicaReplicationSource(hrsOther));
- // Replicate to show stuff still works.
- testHBaseMetaReplicatesOneRow(1);
- // Now pretend a few hours have gone by... roll the meta WAL in original location... Move the
- // meta back and retry replication. See if it works.
- hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
- testHBaseMetaReplicatesOneRow(2);
- hrs.getWAL(meta.getRegionInfo()).rollWriter(true);
- testHBaseMetaReplicatesOneRow(3);
- }
-
- /**
- * Test meta region replica replication. Create some tables and see if replicas pick up the
- * additions.
- */
- private void testHBaseMetaReplicatesOneRow(int i) throws Exception {
- waitForMetaReplicasToOnline();
- try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_" + i),
- HConstants.CATALOG_FAMILY)) {
- verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
- }
- }
-
- /**
- * @return Whether the special meta region replica peer is enabled on <code>hrs</code>
- */
- private boolean isMetaRegionReplicaReplicationSource(HRegionServer hrs) {
- return hrs.getReplicationSourceService().getReplicationManager().
- catalogReplicationSource.get() != null;
- }
-
- /**
* Test meta region replica replication. Create some tables and see if replicas pick up the
* additions.
*/
@Test
public void testHBaseMetaReplicates() throws Exception {
- try (Table table = HTU
- .createTable(TableName.valueOf(this.name.getMethodName() + "_0"), HConstants.CATALOG_FAMILY,
- Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+ try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_0"),
+ HConstants.CATALOG_FAMILY,
+ Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
}
- try (Table table = HTU
- .createTable(TableName.valueOf(this.name.getMethodName() + "_1"), HConstants.CATALOG_FAMILY,
- Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
+ try (Table table = HTU.createTable(TableName.valueOf(this.name.getMethodName() + "_1"),
+ HConstants.CATALOG_FAMILY,
+ Arrays.copyOfRange(HBaseTestingUtil.KEYS, 1, HBaseTestingUtil.KEYS.length))) {
verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, getMetaCells(table.getName()));
// Try delete.
HTU.deleteTableIfAny(table.getName());
@@ -207,26 +137,22 @@ public class TestMetaRegionReplicaReplicationEndpoint {
@Test
public void testCatalogReplicaReplicationWithFlushAndCompaction() throws Exception {
- Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- TableName tableName = TableName.valueOf("hbase:meta");
- Table table = connection.getTable(tableName);
- try {
+ try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+ Table table = connection.getTable(TableName.META_TABLE_NAME)) {
// load the data to the table
for (int i = 0; i < 5; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
HTU.loadNumericRows(table, HConstants.CATALOG_FAMILY, i * 1000, i * 1000 + 1000);
LOG.info("flushing table");
- HTU.flush(tableName);
+ HTU.flush(TableName.META_TABLE_NAME);
LOG.info("compacting table");
if (i < 4) {
- HTU.compact(tableName, false);
+ HTU.compact(TableName.META_TABLE_NAME, false);
}
}
- verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
- } finally {
- table.close();
- connection.close();
+ verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+ HConstants.CATALOG_FAMILY);
}
}
@@ -235,7 +161,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
SingleProcessHBaseCluster cluster = HTU.getMiniHBaseCluster();
HRegionServer hrs = cluster.getRegionServer(cluster.getServerHoldingMeta());
- HRegionServer hrsMetaReplica = null;
HRegionServer hrsNoMetaReplica = null;
HRegionServer server = null;
Region metaReplica = null;
@@ -260,11 +185,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
hrsNoMetaReplica = server;
}
}
-
- Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- TableName tableName = TableName.valueOf("hbase:meta");
- Table table = connection.getTable(tableName);
- try {
+ try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+ Table table = connection.getTable(TableName.META_TABLE_NAME)) {
// load the data to the table
for (int i = 0; i < 5; i++) {
LOG.info("Writing data from " + i * 1000 + " to " + (i * 1000 + 1000));
@@ -274,10 +196,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
}
- verifyReplication(tableName, numOfMetaReplica, 0, 5000, HConstants.CATALOG_FAMILY);
- } finally {
- table.close();
- connection.close();
+ verifyReplication(TableName.META_TABLE_NAME, numOfMetaReplica, 0, 5000,
+ HConstants.CATALOG_FAMILY);
}
}
@@ -324,22 +244,6 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
/**
- * Replicas come online after primary.
- */
- private void waitForMetaReplicasToOnline() throws IOException {
- final RegionLocator regionLocator =
- HTU.getConnection().getRegionLocator(TableName.META_TABLE_NAME);
- HTU.waitFor(10000,
- // getRegionLocations returns an entry for each replica but if unassigned, entry is null.
- // Pass reload to force us to skip cache else it just keeps returning default.
- () -> regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW, true).stream().
- filter(Objects::nonNull).count() >= numOfMetaReplica);
- List<HRegionLocation> locations = regionLocator.getRegionLocations(HConstants.EMPTY_START_ROW);
- LOG.info("Found locations {}", locations);
- assertEquals(numOfMetaReplica, locations.size());
- }
-
- /**
* Scan hbase:meta for <code>tableName</code> content.
*/
private List<Result> getMetaCells(TableName tableName) throws IOException {
@@ -373,20 +277,9 @@ public class TestMetaRegionReplicaReplicationEndpoint {
return regions;
}
- private Region getOneRegion(TableName tableName) {
- for (int i = 0; i < NB_SERVERS; i++) {
- HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
- List<HRegion> onlineRegions = rs.getRegions(tableName);
- if (onlineRegions.size() > 1) {
- return onlineRegions.get(0);
- }
- }
- return null;
- }
-
/**
- * Verify when a Table is deleted from primary, then there are no references in replicas
- * (because they get the delete of the table rows too).
+ * Verify when a Table is deleted from primary, then there are no references in replicas (because
+ * they get the delete of the table rows too).
*/
private void verifyDeletedReplication(TableName tableName, int regionReplication,
final TableName deletedTableName) {
@@ -417,8 +310,8 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
/**
- * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed
- * by HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
+ * Cells are from hbase:meta replica so will start w/ 'tableName,'; i.e. the tablename followed by
+ * HConstants.DELIMITER. Make sure the deleted table is no longer present in passed
* <code>cells</code>.
*/
private boolean doesNotContain(List<Cell> cells, TableName tableName) {
@@ -491,21 +384,19 @@ public class TestMetaRegionReplicaReplicationEndpoint {
}
private void primaryNoChangeReplicaIncrease(final long[] before, final long[] after) {
- assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID],
- after[RegionInfo.DEFAULT_REPLICA_ID]);
+ assertEquals(before[RegionInfo.DEFAULT_REPLICA_ID], after[RegionInfo.DEFAULT_REPLICA_ID]);
- for (int i = 1; i < after.length; i ++) {
+ for (int i = 1; i < after.length; i++) {
assertTrue(after[i] > before[i]);
}
}
private void primaryIncreaseReplicaNoChange(final long[] before, final long[] after) {
// There are read requests increase for primary meta replica.
- assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] >
- before[RegionInfo.DEFAULT_REPLICA_ID]);
+ assertTrue(after[RegionInfo.DEFAULT_REPLICA_ID] > before[RegionInfo.DEFAULT_REPLICA_ID]);
// No change for replica regions
- for (int i = 1; i < after.length; i ++) {
+ for (int i = 1; i < after.length; i++) {
assertEquals(before[i], after[i]);
}
}
@@ -515,13 +406,12 @@ public class TestMetaRegionReplicaReplicationEndpoint {
for (Region r : metaRegions) {
LOG.info("read request for region {} is {}", r, r.getReadRequestsCount());
counters[i] = r.getReadRequestsCount();
- i ++;
+ i++;
}
}
@Test
public void testHBaseMetaReplicaGets() throws Exception {
-
TableName tn = TableName.valueOf(this.name.getMethodName());
final Region[] metaRegions = getAllRegions(TableName.META_TABLE_NAME, numOfMetaReplica);
long[] readReqsForMetaReplicas = new long[numOfMetaReplica];
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
new file mode 100644
index 0000000..7dd4255
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplication.java
@@ -0,0 +1,273 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.regionserver.HRegion;
+import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.testclassification.FlakeyTests;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
+
+/**
+ * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
+ * async wal replication replays the edits to the secondary region in various scenarios.
+ */
+@Category({FlakeyTests.class, LargeTests.class})
+public class TestRegionReplicaReplication {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionReplicaReplication.class);
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(TestRegionReplicaReplication.class);
+
+ private static final int NB_SERVERS = 2;
+
+ private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
+
+ @Rule
+ public TestName name = new TestName();
+
+ @BeforeClass
+ public static void beforeClass() throws Exception {
+ Configuration conf = HTU.getConfiguration();
+ conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
+ conf.setInt("replication.source.size.capacity", 10240);
+ conf.setLong("replication.source.sleepforretries", 100);
+ conf.setInt("hbase.regionserver.maxlogs", 10);
+ conf.setLong("hbase.master.logcleaner.ttl", 10);
+ conf.setInt("zookeeper.recovery.retry", 1);
+ conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
+ conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
+ conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+ conf.setInt("replication.stats.thread.period.seconds", 5);
+ conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
+ conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
+
+ HTU.startMiniCluster(NB_SERVERS);
+ }
+
+ @AfterClass
+ public static void afterClass() throws Exception {
+ HTU.shutdownMiniCluster();
+ }
+
+ private void testRegionReplicaReplication(int regionReplication) throws Exception {
+ // test region replica replication. Create a table with single region, write some data
+ // ensure that data is replicated to the secondary region
+ TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
+ + regionReplication);
+ TableDescriptor htd = HTU
+ .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
+ ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
+ ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
+ .setRegionReplication(regionReplication).build();
+ createOrEnableTableWithRetries(htd, true);
+ TableName tableNameNoReplicas =
+ TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
+ HTU.deleteTableIfAny(tableNameNoReplicas);
+ HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
+
+ try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+ Table table = connection.getTable(tableName);
+ Table tableNoReplicas = connection.getTable(tableNameNoReplicas)) {
+ // load some data to the non-replicated table
+ HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
+
+ // load the data to the table
+ HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
+
+ verifyReplication(tableName, regionReplication, 0, 1000);
+ } finally {
+ HTU.deleteTableIfAny(tableNameNoReplicas);
+ }
+ }
+
+ private void verifyReplication(TableName tableName, int regionReplication,
+ final int startRow, final int endRow) throws Exception {
+ verifyReplication(tableName, regionReplication, startRow, endRow, true);
+ }
+
+ private void verifyReplication(TableName tableName, int regionReplication,
+ final int startRow, final int endRow, final boolean present) throws Exception {
+ // find the regions
+ final Region[] regions = new Region[regionReplication];
+
+ for (int i=0; i < NB_SERVERS; i++) {
+ HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
+ List<HRegion> onlineRegions = rs.getRegions(tableName);
+ for (HRegion region : onlineRegions) {
+ regions[region.getRegionInfo().getReplicaId()] = region;
+ }
+ }
+
+ for (Region region : regions) {
+ assertNotNull(region);
+ }
+
+ for (int i = 1; i < regionReplication; i++) {
+ final Region region = regions[i];
+ // wait until all the data is replicated to all secondary regions
+ Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ LOG.info("verifying replication for region replica:" + region.getRegionInfo());
+ try {
+ HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
+ } catch(Throwable ex) {
+ LOG.warn("Verification from secondary region is not complete yet", ex);
+ // still wait
+ return false;
+ }
+ return true;
+ }
+ });
+ }
+ }
+
+ @Test
+ public void testRegionReplicaReplicationWith2Replicas() throws Exception {
+ testRegionReplicaReplication(2);
+ }
+
+ @Test
+ public void testRegionReplicaReplicationWith3Replicas() throws Exception {
+ testRegionReplicaReplication(3);
+ }
+
+ @Test
+ public void testRegionReplicaReplicationWith10Replicas() throws Exception {
+ testRegionReplicaReplication(10);
+ }
+
+ @Test
+ public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
+ int regionReplication = 3;
+ TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+ .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
+ createOrEnableTableWithRetries(htd, true);
+ final TableName tableName = htd.getTableName();
+ Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+ Table table = connection.getTable(tableName);
+ try {
+ // write data to the primary. The replicas should not receive the data
+ final int STEP = 100;
+ for (int i = 0; i < 3; ++i) {
+ final int startRow = i * STEP;
+ final int endRow = (i + 1) * STEP;
+ LOG.info("Writing data from " + startRow + " to " + endRow);
+ HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
+ verifyReplication(tableName, regionReplication, startRow, endRow, false);
+
+ // Flush the table, now the data should show up in the replicas
+ LOG.info("flushing table");
+ HTU.flush(tableName);
+ verifyReplication(tableName, regionReplication, 0, endRow, true);
+ }
+ } finally {
+ table.close();
+ connection.close();
+ }
+ }
+
+ @Test
+ public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
+ // Tests a table with region replication 3. Writes some data, and causes flushes and
+ // compactions. Verifies that the data is readable from the replicas. Note that this
+ // does not test whether the replicas actually pick up flushed files and apply compaction
+ // to their stores
+ int regionReplication = 3;
+ TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
+ .setRegionReplication(regionReplication).build();
+ createOrEnableTableWithRetries(htd, true);
+ final TableName tableName = htd.getTableName();
+
+ Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
+ Table table = connection.getTable(tableName);
+ try {
+ // load the data to the table
+
+ for (int i = 0; i < 6000; i += 1000) {
+ LOG.info("Writing data from " + i + " to " + (i+1000));
+ HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
+ LOG.info("flushing table");
+ HTU.flush(tableName);
+ LOG.info("compacting table");
+ HTU.compact(tableName, false);
+ }
+
+ verifyReplication(tableName, regionReplication, 0, 1000);
+ } finally {
+ table.close();
+ connection.close();
+ }
+ }
+
+ private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
+ // Helper function to run create/enable table operations with a retry feature
+ boolean continueToRetry = true;
+ int tries = 0;
+ while (continueToRetry && tries < 50) {
+ try {
+ continueToRetry = false;
+ if (createTableOperation) {
+ HTU.getAdmin().createTable(htd);
+ } else {
+ HTU.getAdmin().enableTable(htd.getTableName());
+ }
+ } catch (IOException e) {
+ if (e.getCause() instanceof ReplicationException) {
+ continueToRetry = true;
+ tries++;
+ Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
+ }
+ }
+ }
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
deleted file mode 100644
index d238e09..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ /dev/null
@@ -1,515 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.ArgumentMatchers.eq;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-import java.io.IOException;
-import java.util.List;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.Cell.Type;
-import org.apache.hadoop.hbase.CellBuilderFactory;
-import org.apache.hadoop.hbase.CellBuilderType;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.regionserver.HRegion;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.testclassification.FlakeyTests;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hbase.thirdparty.com.google.common.util.concurrent.Uninterruptibles;
-
-/**
- * Tests RegionReplicaReplicationEndpoint class by setting up region replicas and verifying
- * async wal replication replays the edits to the secondary region in various scenarios.
- */
-@Category({FlakeyTests.class, LargeTests.class})
-public class TestRegionReplicaReplicationEndpoint {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpoint.class);
-
- private static final Logger LOG =
- LoggerFactory.getLogger(TestRegionReplicaReplicationEndpoint.class);
-
- private static final int NB_SERVERS = 2;
-
- private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
-
- @Rule
- public TestName name = new TestName();
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- Configuration conf = HTU.getConfiguration();
- conf.setFloat("hbase.regionserver.logroll.multiplier", 0.0003f);
- conf.setInt("replication.source.size.capacity", 10240);
- conf.setLong("replication.source.sleepforretries", 100);
- conf.setInt("hbase.regionserver.maxlogs", 10);
- conf.setLong("hbase.master.logcleaner.ttl", 10);
- conf.setInt("zookeeper.recovery.retry", 1);
- conf.setInt("zookeeper.recovery.retry.intervalmill", 10);
- conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
- conf.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
- conf.setInt("replication.stats.thread.period.seconds", 5);
- conf.setBoolean("hbase.tests.use.shortcircuit.reads", false);
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5); // less number of retries is needed
- conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
-
- HTU.startMiniCluster(NB_SERVERS);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- HTU.shutdownMiniCluster();
- }
-
- @Test
- public void testRegionReplicaReplicationPeerIsCreated() throws IOException {
- // create a table with region replicas. Check whether the replication peer is created
- // and replication started.
- try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- Admin admin = connection.getAdmin()) {
- String peerId = ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER;
-
- ReplicationPeerConfig peerConfig = null;
- try {
- peerConfig = admin.getReplicationPeerConfig(peerId);
- } catch (ReplicationPeerNotFoundException e) {
- LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
- }
-
- if (peerConfig != null) {
- admin.removeReplicationPeer(peerId);
- peerConfig = null;
- }
-
- TableDescriptor htd = HTU
- .createTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated_no_region_replicas"),
- ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
- ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
- createOrEnableTableWithRetries(htd, true);
- try {
- peerConfig = admin.getReplicationPeerConfig(peerId);
- fail("Should throw ReplicationException, because replication peer id=" + peerId
- + " not exist");
- } catch (ReplicationPeerNotFoundException e) {
- }
- assertNull(peerConfig);
-
- htd = HTU.createModifyableTableDescriptor(TableName.valueOf("testReplicationPeerIsCreated"),
- ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
- ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED).setRegionReplication(2).build();
- createOrEnableTableWithRetries(htd, true);
-
- // assert peer configuration is correct
- peerConfig = admin.getReplicationPeerConfig(peerId);
- assertNotNull(peerConfig);
- assertEquals(peerConfig.getClusterKey(),
- ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
- assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
- peerConfig.getReplicationEndpointImpl());
- }
- }
-
- @Test
- public void testRegionReplicaReplicationPeerIsCreatedForModifyTable() throws Exception {
- // modify a table by adding region replicas. Check whether the replication peer is created
- // and replication started.
- try (Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- Admin admin = connection.getAdmin()) {
- String peerId = "region_replica_replication";
-
- ReplicationPeerConfig peerConfig = null;
- try {
- peerConfig = admin.getReplicationPeerConfig(peerId);
- } catch (ReplicationPeerNotFoundException e) {
- LOG.warn("Region replica replication peer id=" + peerId + " not exist", e);
- }
-
- if (peerConfig != null) {
- admin.removeReplicationPeer(peerId);
- peerConfig = null;
- }
-
- TableDescriptor htd = HTU.createTableDescriptor(
- TableName.valueOf("testRegionReplicaReplicationPeerIsCreatedForModifyTable"),
- ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
- ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
- createOrEnableTableWithRetries(htd, true);
-
- // assert that replication peer is not created yet
- try {
- peerConfig = admin.getReplicationPeerConfig(peerId);
- fail("Should throw ReplicationException, because replication peer id=" + peerId
- + " not exist");
- } catch (ReplicationPeerNotFoundException e) {
- }
- assertNull(peerConfig);
-
- HTU.getAdmin().disableTable(htd.getTableName());
- htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(2).build();
- HTU.getAdmin().modifyTable(htd);
- createOrEnableTableWithRetries(htd, false);
-
- // assert peer configuration is correct
- peerConfig = admin.getReplicationPeerConfig(peerId);
- assertNotNull(peerConfig);
- assertEquals(peerConfig.getClusterKey(),
- ZKConfig.getZooKeeperClusterKey(HTU.getConfiguration()));
- assertEquals(RegionReplicaReplicationEndpoint.class.getName(),
- peerConfig.getReplicationEndpointImpl());
- }
- }
-
- public void testRegionReplicaReplication(int regionReplication) throws Exception {
- // test region replica replication. Create a table with single region, write some data
- // ensure that data is replicated to the secondary region
- TableName tableName = TableName.valueOf("testRegionReplicaReplicationWithReplicas_"
- + regionReplication);
- TableDescriptor htd = HTU
- .createModifyableTableDescriptor(TableName.valueOf(tableName.toString()),
- ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
- ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
- .setRegionReplication(regionReplication).build();
- createOrEnableTableWithRetries(htd, true);
- TableName tableNameNoReplicas =
- TableName.valueOf("testRegionReplicaReplicationWithReplicas_NO_REPLICAS");
- HTU.deleteTableIfAny(tableNameNoReplicas);
- HTU.createTable(tableNameNoReplicas, HBaseTestingUtil.fam1);
-
- Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- Table table = connection.getTable(tableName);
- Table tableNoReplicas = connection.getTable(tableNameNoReplicas);
-
- try {
- // load some data to the non-replicated table
- HTU.loadNumericRows(tableNoReplicas, HBaseTestingUtil.fam1, 6000, 7000);
-
- // load the data to the table
- HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
- verifyReplication(tableName, regionReplication, 0, 1000);
-
- } finally {
- table.close();
- tableNoReplicas.close();
- HTU.deleteTableIfAny(tableNameNoReplicas);
- connection.close();
- }
- }
-
- private void verifyReplication(TableName tableName, int regionReplication,
- final int startRow, final int endRow) throws Exception {
- verifyReplication(tableName, regionReplication, startRow, endRow, true);
- }
-
- private void verifyReplication(TableName tableName, int regionReplication,
- final int startRow, final int endRow, final boolean present) throws Exception {
- // find the regions
- final Region[] regions = new Region[regionReplication];
-
- for (int i=0; i < NB_SERVERS; i++) {
- HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(i);
- List<HRegion> onlineRegions = rs.getRegions(tableName);
- for (HRegion region : onlineRegions) {
- regions[region.getRegionInfo().getReplicaId()] = region;
- }
- }
-
- for (Region region : regions) {
- assertNotNull(region);
- }
-
- for (int i = 1; i < regionReplication; i++) {
- final Region region = regions[i];
- // wait until all the data is replicated to all secondary regions
- Waiter.waitFor(HTU.getConfiguration(), 90000, 1000, new Waiter.Predicate<Exception>() {
- @Override
- public boolean evaluate() throws Exception {
- LOG.info("verifying replication for region replica:" + region.getRegionInfo());
- try {
- HTU.verifyNumericRows(region, HBaseTestingUtil.fam1, startRow, endRow, present);
- } catch(Throwable ex) {
- LOG.warn("Verification from secondary region is not complete yet", ex);
- // still wait
- return false;
- }
- return true;
- }
- });
- }
- }
-
- @Test
- public void testRegionReplicaReplicationWith2Replicas() throws Exception {
- testRegionReplicaReplication(2);
- }
-
- @Test
- public void testRegionReplicaReplicationWith3Replicas() throws Exception {
- testRegionReplicaReplication(3);
- }
-
- @Test
- public void testRegionReplicaReplicationWith10Replicas() throws Exception {
- testRegionReplicaReplication(10);
- }
-
- @Test
- public void testRegionReplicaWithoutMemstoreReplication() throws Exception {
- int regionReplication = 3;
- TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
- .setRegionReplication(regionReplication).setRegionMemStoreReplication(false).build();
- createOrEnableTableWithRetries(htd, true);
- final TableName tableName = htd.getTableName();
- Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- Table table = connection.getTable(tableName);
- try {
- // write data to the primary. The replicas should not receive the data
- final int STEP = 100;
- for (int i = 0; i < 3; ++i) {
- final int startRow = i * STEP;
- final int endRow = (i + 1) * STEP;
- LOG.info("Writing data from " + startRow + " to " + endRow);
- HTU.loadNumericRows(table, HBaseTestingUtil.fam1, startRow, endRow);
- verifyReplication(tableName, regionReplication, startRow, endRow, false);
-
- // Flush the table, now the data should show up in the replicas
- LOG.info("flushing table");
- HTU.flush(tableName);
- verifyReplication(tableName, regionReplication, 0, endRow, true);
- }
- } finally {
- table.close();
- connection.close();
- }
- }
-
- @Test
- public void testRegionReplicaReplicationForFlushAndCompaction() throws Exception {
- // Tests a table with region replication 3. Writes some data, and causes flushes and
- // compactions. Verifies that the data is readable from the replicas. Note that this
- // does not test whether the replicas actually pick up flushed files and apply compaction
- // to their stores
- int regionReplication = 3;
- TableDescriptor htd = HTU.createModifyableTableDescriptor(name.getMethodName())
- .setRegionReplication(regionReplication).build();
- createOrEnableTableWithRetries(htd, true);
- final TableName tableName = htd.getTableName();
-
- Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- Table table = connection.getTable(tableName);
- try {
- // load the data to the table
-
- for (int i = 0; i < 6000; i += 1000) {
- LOG.info("Writing data from " + i + " to " + (i+1000));
- HTU.loadNumericRows(table, HBaseTestingUtil.fam1, i, i+1000);
- LOG.info("flushing table");
- HTU.flush(tableName);
- LOG.info("compacting table");
- HTU.compact(tableName, false);
- }
-
- verifyReplication(tableName, regionReplication, 0, 1000);
- } finally {
- table.close();
- connection.close();
- }
- }
-
- @Test
- public void testRegionReplicaReplicationIgnoresDisabledTables() throws Exception {
- testRegionReplicaReplicationIgnores(false, false);
- }
-
- @Test
- public void testRegionReplicaReplicationIgnoresDroppedTables() throws Exception {
- testRegionReplicaReplicationIgnores(true, false);
- }
-
- @Test
- public void testRegionReplicaReplicationIgnoresNonReplicatedTables() throws Exception {
- testRegionReplicaReplicationIgnores(false, true);
- }
-
- private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
- throws Exception {
- // tests having edits from a disabled or dropped table is handled correctly by skipping those
- // entries and further edits after the edits from dropped/disabled table can be replicated
- // without problems.
- int regionReplication = 3;
- TableDescriptor htd = HTU
- .createModifyableTableDescriptor(
- name.getMethodName() + "_drop_" + dropTable + "_disabledReplication_" + disableReplication)
- .setRegionReplication(regionReplication).build();
- final TableName tableName = htd.getTableName();
- HTU.deleteTableIfAny(tableName);
-
- createOrEnableTableWithRetries(htd, true);
- TableName toBeDisabledTable = TableName.valueOf(
- dropTable ? "droppedTable" : (disableReplication ? "disableReplication" : "disabledTable"));
- HTU.deleteTableIfAny(toBeDisabledTable);
- htd = HTU
- .createModifyableTableDescriptor(toBeDisabledTable,
- ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
- ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED)
- .setRegionReplication(regionReplication).build();
- createOrEnableTableWithRetries(htd, true);
-
- // both tables are created, now pause replication
- HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
- // now that the replication is disabled, write to the table to be dropped, then drop the table.
-
- Connection connection = ConnectionFactory.createConnection(HTU.getConfiguration());
- Table table = connection.getTable(tableName);
- Table tableToBeDisabled = connection.getTable(toBeDisabledTable);
-
- HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtil.fam1, 6000, 7000);
-
- RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
- HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
- byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
-
- Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
- .setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
- Entry entry = new Entry(
- new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
- new WALEdit()
- .add(cell));
- HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
- if (dropTable) {
- HTU.getAdmin().deleteTable(toBeDisabledTable);
- } else if (disableReplication) {
- htd =
- TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication - 2).build();
- HTU.getAdmin().modifyTable(htd);
- createOrEnableTableWithRetries(htd, false);
- }
-
- HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
- MetricsSource metrics = mock(MetricsSource.class);
- ReplicationEndpoint.Context ctx =
- new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
- HTU.getTestFileSystem(), ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER,
- UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
- .getPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER),
- metrics, rs.getTableDescriptors(), rs);
- RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
- rrpe.init(ctx);
- rrpe.start();
- ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
- repCtx.setEntries(Lists.newArrayList(entry, entry));
- assertTrue(rrpe.replicate(repCtx));
- verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
- rrpe.stop();
- if (disableReplication) {
- // enable replication again so that we can verify replication
- HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
- htd = TableDescriptorBuilder.newBuilder(htd).setRegionReplication(regionReplication).build();
- HTU.getAdmin().modifyTable(htd);
- createOrEnableTableWithRetries(htd, false);
- }
-
- try {
- // load some data to the to-be-dropped table
- // load the data to the table
- HTU.loadNumericRows(table, HBaseTestingUtil.fam1, 0, 1000);
-
- // now enable the replication
- HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_PEER);
-
- verifyReplication(tableName, regionReplication, 0, 1000);
- } finally {
- table.close();
- rl.close();
- tableToBeDisabled.close();
- HTU.deleteTableIfAny(toBeDisabledTable);
- connection.close();
- }
- }
-
- private void createOrEnableTableWithRetries(TableDescriptor htd, boolean createTableOperation) {
- // Helper function to run create/enable table operations with a retry feature
- boolean continueToRetry = true;
- int tries = 0;
- while (continueToRetry && tries < 50) {
- try {
- continueToRetry = false;
- if (createTableOperation) {
- HTU.getAdmin().createTable(htd);
- } else {
- HTU.getAdmin().enableTable(htd.getTableName());
- }
- } catch (IOException e) {
- if (e.getCause() instanceof ReplicationException) {
- continueToRetry = true;
- tries++;
- Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);
- }
- }
- }
- }
-}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
deleted file mode 100644
index a37ab5f..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ /dev/null
@@ -1,281 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.regionserver;
-
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.closeRegion;
-import static org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster.openRegion;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-import java.util.Collections;
-import java.util.Optional;
-import java.util.Queue;
-import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtil;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.RegionLocations;
-import org.apache.hadoop.hbase.StartTestingClusterOption;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.AsyncClusterConnection;
-import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
-import org.apache.hadoop.hbase.coprocessor.ObserverContext;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
-import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
-import org.apache.hadoop.hbase.coprocessor.WALObserver;
-import org.apache.hadoop.hbase.regionserver.HRegionServer;
-import org.apache.hadoop.hbase.regionserver.Region;
-import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
-import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.security.User;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
-import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALEdit;
-import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.wal.WALKeyImpl;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
-/**
- * Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
- * class contains lower level tests using callables.
- */
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestRegionReplicaReplicationEndpointNoMaster {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestRegionReplicaReplicationEndpointNoMaster.class);
-
- private static final int NB_SERVERS = 2;
- private static TableName tableName = TableName.valueOf(
- TestRegionReplicaReplicationEndpointNoMaster.class.getSimpleName());
- private static Table table;
- private static final byte[] row = Bytes.toBytes("TestRegionReplicaReplicator");
-
- private static HRegionServer rs0;
- private static HRegionServer rs1;
-
- private static RegionInfo hriPrimary;
- private static RegionInfo hriSecondary;
-
- private static final HBaseTestingUtil HTU = new HBaseTestingUtil();
- private static final byte[] f = HConstants.CATALOG_FAMILY;
-
- @BeforeClass
- public static void beforeClass() throws Exception {
- Configuration conf = HTU.getConfiguration();
- conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_REPLICATION_CONF_KEY, true);
- conf.setBoolean(ServerRegionReplicaUtil.REGION_REPLICA_WAIT_FOR_PRIMARY_FLUSH_CONF_KEY, false);
-
- // install WALObserver coprocessor for tests
- String walCoprocs = HTU.getConfiguration().get(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY);
- if (walCoprocs == null) {
- walCoprocs = WALEditCopro.class.getName();
- } else {
- walCoprocs += "," + WALEditCopro.class.getName();
- }
- HTU.getConfiguration().set(CoprocessorHost.WAL_COPROCESSOR_CONF_KEY,
- walCoprocs);
- StartTestingClusterOption option = StartTestingClusterOption.builder()
- .numAlwaysStandByMasters(1).numRegionServers(NB_SERVERS).numDataNodes(NB_SERVERS).build();
- HTU.startMiniCluster(option);
-
- // Create table then get the single region for our new table.
- TableDescriptor htd = HTU.createTableDescriptor(TableName.valueOf(tableName.getNameAsString()),
- ColumnFamilyDescriptorBuilder.DEFAULT_MIN_VERSIONS, 3, HConstants.FOREVER,
- ColumnFamilyDescriptorBuilder.DEFAULT_KEEP_DELETED);
- table = HTU.createTable(htd, new byte[][]{f}, null);
-
- try (RegionLocator locator = HTU.getConnection().getRegionLocator(tableName)) {
- hriPrimary = locator.getRegionLocation(row, false).getRegion();
- }
-
- // mock a secondary region info to open
- hriSecondary = RegionReplicaUtil.getRegionInfoForReplica(hriPrimary, 1);
-
- // No master
- TestRegionServerNoMaster.stopMasterAndCacheMetaLocation(HTU);
- rs0 = HTU.getMiniHBaseCluster().getRegionServer(0);
- rs1 = HTU.getMiniHBaseCluster().getRegionServer(1);
- }
-
- @AfterClass
- public static void afterClass() throws Exception {
- HRegionServer.TEST_SKIP_REPORTING_TRANSITION = false;
- table.close();
- HTU.shutdownMiniCluster();
- }
-
- @Before
- public void before() throws Exception {
- entries.clear();
- }
-
- @After
- public void after() throws Exception {
- }
-
- static ConcurrentLinkedQueue<Entry> entries = new ConcurrentLinkedQueue<>();
-
- public static class WALEditCopro implements WALCoprocessor, WALObserver {
- public WALEditCopro() {
- entries.clear();
- }
-
- @Override
- public Optional<WALObserver> getWALObserver() {
- return Optional.of(this);
- }
-
- @Override
- public void postWALWrite(ObserverContext<? extends WALCoprocessorEnvironment> ctx,
- RegionInfo info, WALKey logKey, WALEdit logEdit) throws IOException {
- // only keep primary region's edits
- if (logKey.getTableName().equals(tableName) && info.getReplicaId() == 0) {
- // Presume type is a WALKeyImpl
- entries.add(new Entry((WALKeyImpl)logKey, logEdit));
- }
- }
- }
-
- @Test
- public void testReplayCallable() throws Exception {
- // tests replaying the edits to a secondary region replica using the Callable directly
- openRegion(HTU, rs0, hriSecondary);
-
- // load some data to primary
- HTU.loadNumericRows(table, f, 0, 1000);
-
- Assert.assertEquals(1000, entries.size());
- try (AsyncClusterConnection conn = ClusterConnectionFactory
- .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(conn, entries);
- }
-
- Region region = rs0.getRegion(hriSecondary.getEncodedName());
- HTU.verifyNumericRows(region, f, 0, 1000);
-
- HTU.deleteNumericRows(table, f, 0, 1000);
- closeRegion(HTU, rs0, hriSecondary);
- }
-
- private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
- throws IOException, ExecutionException, InterruptedException {
- Entry entry;
- while ((entry = entries.poll()) != null) {
- byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
- RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
- connection
- .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
- Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
- .get();
- }
- }
-
- @Test
- public void testReplayCallableWithRegionMove() throws Exception {
- // tests replaying the edits to a secondary region replica using the Callable directly while
- // the region is moved to another location.It tests handling of RME.
- try (AsyncClusterConnection conn = ClusterConnectionFactory
- .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
- openRegion(HTU, rs0, hriSecondary);
- // load some data to primary
- HTU.loadNumericRows(table, f, 0, 1000);
-
- Assert.assertEquals(1000, entries.size());
-
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(conn, entries);
-
- Region region = rs0.getRegion(hriSecondary.getEncodedName());
- HTU.verifyNumericRows(region, f, 0, 1000);
-
- HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
-
- // move the secondary region from RS0 to RS1
- closeRegion(HTU, rs0, hriSecondary);
- openRegion(HTU, rs1, hriSecondary);
-
- // replicate the new data
- replicateUsingCallable(conn, entries);
-
- region = rs1.getRegion(hriSecondary.getEncodedName());
- // verify the new data. old data may or may not be there
- HTU.verifyNumericRows(region, f, 1000, 2000);
-
- HTU.deleteNumericRows(table, f, 0, 2000);
- closeRegion(HTU, rs1, hriSecondary);
- }
- }
-
- @Test
- public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
- // tests replaying the edits to a secondary region replica using the RRRE.replicate()
- openRegion(HTU, rs0, hriSecondary);
- RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
-
- ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
- when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
- when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
- when(context.getServer()).thenReturn(rs0);
- when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
- replicator.init(context);
- replicator.startAsync();
-
- //load some data to primary
- HTU.loadNumericRows(table, f, 0, 1000);
-
- Assert.assertEquals(1000, entries.size());
- // replay the edits to the secondary using replay callable
- final String fakeWalGroupId = "fakeWALGroup";
- replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
- .setWalGroupId(fakeWalGroupId));
- replicator.stop();
- Region region = rs0.getRegion(hriSecondary.getEncodedName());
- HTU.verifyNumericRows(region, f, 0, 1000);
-
- HTU.deleteNumericRows(table, f, 0, 1000);
- closeRegion(HTU, rs0, hriSecondary);
- }
-}