You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2019/05/02 09:05:18 UTC
[hbase] 06/25: HBASE-21671 Rewrite RegionReplicaReplicationEndpoint
to use AsyncClusterConnection
This is an automated email from the ASF dual-hosted git repository.
zhangduo pushed a commit to branch HBASE-21512
in repository https://gitbox.apache.org/repos/asf/hbase.git
commit fdd0251f33cd340b472c0ff6d222f3fa651ac60a
Author: Duo Zhang <zh...@apache.org>
AuthorDate: Fri Jan 11 16:22:24 2019 +0800
HBASE-21671 Rewrite RegionReplicaReplicationEndpoint to use AsyncClusterConnection
---
.../hadoop/hbase/client/AsyncConnectionImpl.java | 25 +-
.../hbase/client/AsyncClusterConnection.java | 17 +
.../hbase/client/AsyncClusterConnectionImpl.java | 80 +++
.../AsyncRegionReplicaReplayRetryingCaller.java | 147 ++++
.../hbase/client/AsyncRegionServerAdmin.java | 5 +-
.../hbase/client/ClusterConnectionFactory.java | 2 +-
.../hbase/protobuf/ReplicationProtbufUtil.java | 31 +-
.../handler/RegionReplicaFlushHandler.java | 3 +-
.../hbase/replication/ReplicationEndpoint.java | 35 +-
.../RegionReplicaReplicationEndpoint.java | 782 +++++++--------------
.../regionserver/ReplicationSource.java | 2 +-
.../TestRegionReplicaReplicationEndpoint.java | 56 +-
...stRegionReplicaReplicationEndpointNoMaster.java | 98 ++-
13 files changed, 627 insertions(+), 656 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
index c17cca9..d3d50d7 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncConnectionImpl.java
@@ -60,7 +60,6 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ClientService;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
@@ -69,7 +68,7 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterServ
* The implementation of AsyncConnection.
*/
@InterfaceAudience.Private
-class AsyncConnectionImpl implements AsyncClusterConnection {
+class AsyncConnectionImpl implements AsyncConnection {
private static final Logger LOG = LoggerFactory.getLogger(AsyncConnectionImpl.class);
@@ -89,7 +88,7 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
private final int rpcTimeout;
- private final RpcClient rpcClient;
+ protected final RpcClient rpcClient;
final RpcControllerFactory rpcControllerFactory;
@@ -218,16 +217,10 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
}
// ditto
- @Override
- public NonceGenerator getNonceGenerator() {
+ NonceGenerator getNonceGenerator() {
return nonceGenerator;
}
- @Override
- public RpcClient getRpcClient() {
- return rpcClient;
- }
-
private ClientService.Interface createRegionServerStub(ServerName serverName) throws IOException {
return ClientService.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}
@@ -380,16 +373,4 @@ class AsyncConnectionImpl implements AsyncClusterConnection {
Optional<MetricsConnection> getConnectionMetrics() {
return metrics;
}
-
- @Override
- public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
- return new AsyncRegionServerAdmin(serverName, this);
- }
-
- @Override
- public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
- boolean writeFlushWALMarker) {
- RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
- return admin.flushRegionInternal(regionName, writeFlushWALMarker);
- }
}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
similarity index 72%
rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
index f1f64ca..0ad77ba 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnection.java
@@ -17,9 +17,13 @@
*/
package org.apache.hadoop.hbase.client;
+import java.util.List;
import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
@@ -49,4 +53,17 @@ public interface AsyncClusterConnection extends AsyncConnection {
* Flush a region and get the response.
*/
CompletableFuture<FlushRegionResponse> flush(byte[] regionName, boolean writeFlushWALMarker);
+
+ /**
+ * Replicate wal edits for replica regions. The return value is the edits we skipped, as the
+ * original return value is useless.
+ */
+ CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
+ List<Entry> entries, int replicaId, int numRetries, long operationTimeoutNs);
+
+ /**
+ * Return all the replicas for a region. Used for regiong replica replication.
+ */
+ CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+ boolean reload);
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
new file mode 100644
index 0000000..d61f01f
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncClusterConnectionImpl.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import java.net.SocketAddress;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.RegionLocations;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.ipc.RpcClient;
+import org.apache.hadoop.hbase.security.User;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.FlushRegionResponse;
+
+/**
+ * The implementation of AsyncClusterConnection.
+ */
+@InterfaceAudience.Private
+class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {
+
+ public AsyncClusterConnectionImpl(Configuration conf, AsyncRegistry registry, String clusterId,
+ SocketAddress localAddress, User user) {
+ super(conf, registry, clusterId, localAddress, user);
+ }
+
+ @Override
+ public NonceGenerator getNonceGenerator() {
+ return super.getNonceGenerator();
+ }
+
+ @Override
+ public RpcClient getRpcClient() {
+ return rpcClient;
+ }
+
+ @Override
+ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
+ return new AsyncRegionServerAdmin(serverName, this);
+ }
+
+ @Override
+ public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
+ boolean writeFlushWALMarker) {
+ RawAsyncHBaseAdmin admin = (RawAsyncHBaseAdmin) getAdmin();
+ return admin.flushRegionInternal(regionName, writeFlushWALMarker);
+ }
+
+ @Override
+ public CompletableFuture<Long> replay(TableName tableName, byte[] encodedRegionName, byte[] row,
+ List<Entry> entries, int replicaId, int retries, long operationTimeoutNs) {
+ return new AsyncRegionReplicaReplayRetryingCaller(RETRY_TIMER, this,
+ ConnectionUtils.retries2Attempts(retries), operationTimeoutNs, tableName, encodedRegionName,
+ row, entries, replicaId).call();
+ }
+
+ @Override
+ public CompletableFuture<RegionLocations> getRegionLocations(TableName tableName, byte[] row,
+ boolean reload) {
+ return getLocator().getRegionLocations(tableName, row, RegionLocateType.CURRENT, reload, -1L);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
new file mode 100644
index 0000000..91d9502
--- /dev/null
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionReplicaReplayRetryingCaller.java
@@ -0,0 +1,147 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client;
+
+import static org.apache.hadoop.hbase.util.FutureUtils.addListener;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.hadoop.hbase.CellScanner;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.io.netty.util.HashedWheelTimer;
+
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+
+/**
+ * For replaying edits for region replica.
+ * <p/>
+ * The mainly difference here is that, every time after locating, we will check whether the region
+ * name is equal, if not, we will give up, as this usually means the region has been split or
+ * merged, and the new region(s) should already have all the data of the parent region(s).
+ * <p/>
+ * Notice that, the return value is the edits we skipped, as the original response message is not
+ * used at upper layer.
+ */
+@InterfaceAudience.Private
+public class AsyncRegionReplicaReplayRetryingCaller extends AsyncRpcRetryingCaller<Long> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(AsyncRegionReplicaReplayRetryingCaller.class);
+
+ private final TableName tableName;
+
+ private final byte[] encodedRegionName;
+
+ private final byte[] row;
+
+ private final Entry[] entries;
+
+ private final int replicaId;
+
+ public AsyncRegionReplicaReplayRetryingCaller(HashedWheelTimer retryTimer,
+ AsyncClusterConnectionImpl conn, int maxAttempts, long operationTimeoutNs,
+ TableName tableName, byte[] encodedRegionName, byte[] row, List<Entry> entries,
+ int replicaId) {
+ super(retryTimer, conn, ConnectionUtils.getPriority(tableName), conn.connConf.getPauseNs(),
+ conn.connConf.getPauseForCQTBENs(), maxAttempts, operationTimeoutNs,
+ conn.connConf.getWriteRpcTimeoutNs(), conn.connConf.getStartLogErrorsCnt());
+ this.tableName = tableName;
+ this.encodedRegionName = encodedRegionName;
+ this.row = row;
+ this.entries = entries.toArray(new Entry[0]);
+ this.replicaId = replicaId;
+ }
+
+ private void call(HRegionLocation loc) {
+ if (!Bytes.equals(encodedRegionName, loc.getRegion().getEncodedNameAsBytes())) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Skipping {} entries in table {} because located region {} is different than" +
+ " the original region {} from WALEdit",
+ entries.length, tableName, loc.getRegion().getEncodedName(),
+ Bytes.toStringBinary(encodedRegionName));
+ for (Entry entry : entries) {
+ LOG.trace("Skipping : " + entry);
+ }
+ }
+ future.complete(Long.valueOf(entries.length));
+ return;
+ }
+
+ AdminService.Interface stub;
+ try {
+ stub = conn.getAdminStub(loc.getServerName());
+ } catch (IOException e) {
+ onError(e,
+ () -> "Get async admin stub to " + loc.getServerName() + " for '" +
+ Bytes.toStringBinary(row) + "' in " + loc.getRegion().getEncodedName() + " of " +
+ tableName + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+ return;
+ }
+ Pair<ReplicateWALEntryRequest, CellScanner> p = ReplicationProtbufUtil
+ .buildReplicateWALEntryRequest(entries, encodedRegionName, null, null, null);
+ resetCallTimeout();
+ controller.setCellScanner(p.getSecond());
+ stub.replay(controller, p.getFirst(), r -> {
+ if (controller.failed()) {
+ onError(controller.getFailed(),
+ () -> "Call to " + loc.getServerName() + " for '" + Bytes.toStringBinary(row) + "' in " +
+ loc.getRegion().getEncodedName() + " of " + tableName + " failed",
+ err -> conn.getLocator().updateCachedLocationOnError(loc, err));
+ } else {
+ future.complete(0L);
+ }
+ });
+
+ }
+
+ @Override
+ protected void doCall() {
+ long locateTimeoutNs;
+ if (operationTimeoutNs > 0) {
+ locateTimeoutNs = remainingTimeNs();
+ if (locateTimeoutNs <= 0) {
+ completeExceptionally();
+ return;
+ }
+ } else {
+ locateTimeoutNs = -1L;
+ }
+ addListener(conn.getLocator().getRegionLocation(tableName, row, replicaId,
+ RegionLocateType.CURRENT, locateTimeoutNs), (loc, error) -> {
+ if (error != null) {
+ onError(error,
+ () -> "Locate '" + Bytes.toStringBinary(row) + "' in " + tableName + " failed", err -> {
+ });
+ return;
+ }
+ call(loc);
+ });
+ }
+}
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
similarity index 99%
rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
index b9141a9..d491890 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/AsyncRegionServerAdmin.java
@@ -164,8 +164,9 @@ public class AsyncRegionServerAdmin {
cellScanner);
}
- public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request) {
- return call((stub, controller, done) -> stub.replay(controller, request, done));
+ public CompletableFuture<ReplicateWALEntryResponse> replay(ReplicateWALEntryRequest request,
+ CellScanner cellScanner) {
+ return call((stub, controller, done) -> stub.replay(controller, request, done), cellScanner);
}
public CompletableFuture<RollWALWriterResponse> rollWALWriter(RollWALWriterRequest request) {
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
similarity index 95%
rename from hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
rename to hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
index 79484db..2670420 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/client/ClusterConnectionFactory.java
@@ -46,6 +46,6 @@ public final class ClusterConnectionFactory {
SocketAddress localAddress, User user) throws IOException {
AsyncRegistry registry = AsyncRegistryFactory.getRegistry(conf);
String clusterId = FutureUtils.get(registry.getClusterId());
- return new AsyncConnectionImpl(conf, registry, clusterId, localAddress, user);
+ return new AsyncClusterConnectionImpl(conf, registry, clusterId, localAddress, user);
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 9f41a76..c39c86c 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -37,7 +37,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hbase.thirdparty.com.google.protobuf.UnsafeByteOperations;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos;
@InterfaceAudience.Private
@@ -55,20 +56,18 @@ public class ReplicationProtbufUtil {
public static void replicateWALEntry(AsyncRegionServerAdmin admin, Entry[] entries,
String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
throws IOException {
- Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(
- entries, null, replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
+ Pair<ReplicateWALEntryRequest, CellScanner> p = buildReplicateWALEntryRequest(entries, null,
+ replicationClusterId, sourceBaseNamespaceDir, sourceHFileArchiveDir);
FutureUtils.get(admin.replicateWALEntry(p.getFirst(), p.getSecond()));
}
/**
* Create a new ReplicateWALEntryRequest from a list of WAL entries
- *
* @param entries the WAL entries to be replicated
- * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values
- * found.
+ * @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
- public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
- buildReplicateWALEntryRequest(final Entry[] entries) throws IOException {
+ public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
+ final Entry[] entries) {
return buildReplicateWALEntryRequest(entries, null, null, null, null);
}
@@ -82,16 +81,14 @@ public class ReplicationProtbufUtil {
* @param sourceHFileArchiveDir Path to the source cluster hfile archive directory
* @return a pair of ReplicateWALEntryRequest and a CellScanner over all the WALEdit values found.
*/
- public static Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner>
- buildReplicateWALEntryRequest(final Entry[] entries, byte[] encodedRegionName,
- String replicationClusterId, Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir)
- throws IOException {
+ public static Pair<ReplicateWALEntryRequest, CellScanner> buildReplicateWALEntryRequest(
+ final Entry[] entries, byte[] encodedRegionName, String replicationClusterId,
+ Path sourceBaseNamespaceDir, Path sourceHFileArchiveDir) {
// Accumulate all the Cells seen in here.
List<List<? extends Cell>> allCells = new ArrayList<>(entries.length);
int size = 0;
- AdminProtos.WALEntry.Builder entryBuilder = AdminProtos.WALEntry.newBuilder();
- AdminProtos.ReplicateWALEntryRequest.Builder builder =
- AdminProtos.ReplicateWALEntryRequest.newBuilder();
+ WALEntry.Builder entryBuilder = WALEntry.newBuilder();
+ ReplicateWALEntryRequest.Builder builder = ReplicateWALEntryRequest.newBuilder();
for (Entry entry: entries) {
entryBuilder.clear();
@@ -99,8 +96,8 @@ public class ReplicationProtbufUtil {
try {
keyBuilder = entry.getKey().getBuilder(WALCellCodec.getNoneCompressor());
} catch (IOException e) {
- throw new IOException(
- "There should not throw exception since NoneCompressor do not throw any exceptions", e);
+ throw new AssertionError(
+ "There should not throw exception since NoneCompressor do not throw any exceptions", e);
}
if(encodedRegionName != null){
keyBuilder.setEncodedRegionName(
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
index 0729203..cc798cc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/handler/RegionReplicaFlushHandler.java
@@ -185,7 +185,6 @@ public class RegionReplicaFlushHandler extends EventHandler {
"Was not able to trigger a flush from primary region due to old server version? " +
"Continuing to open the secondary region replica: " +
region.getRegionInfo().getRegionNameAsString());
- region.setReadsEnabled(true);
break;
}
}
@@ -195,6 +194,6 @@ public class RegionReplicaFlushHandler extends EventHandler {
throw new InterruptedIOException(e.getMessage());
}
}
+ region.setReadsEnabled(true);
}
-
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index f4c37b1..ca73663 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -29,6 +29,7 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
+import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.regionserver.MetricsSource;
@@ -53,6 +54,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
class Context {
+ private final Server server;
private final Configuration localConf;
private final Configuration conf;
private final FileSystem fs;
@@ -64,16 +66,11 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
private final Abortable abortable;
@InterfaceAudience.Private
- public Context(
- final Configuration localConf,
- final Configuration conf,
- final FileSystem fs,
- final String peerId,
- final UUID clusterId,
- final ReplicationPeer replicationPeer,
- final MetricsSource metrics,
- final TableDescriptors tableDescriptors,
- final Abortable abortable) {
+ public Context(final Server server, final Configuration localConf, final Configuration conf,
+ final FileSystem fs, final String peerId, final UUID clusterId,
+ final ReplicationPeer replicationPeer, final MetricsSource metrics,
+ final TableDescriptors tableDescriptors, final Abortable abortable) {
+ this.server = server;
this.localConf = localConf;
this.conf = conf;
this.fs = fs;
@@ -84,34 +81,50 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
this.tableDescriptors = tableDescriptors;
this.abortable = abortable;
}
+
+ public Server getServer() {
+ return server;
+ }
+
public Configuration getConfiguration() {
return conf;
}
+
public Configuration getLocalConfiguration() {
return localConf;
}
+
public FileSystem getFilesystem() {
return fs;
}
+
public UUID getClusterId() {
return clusterId;
}
+
public String getPeerId() {
return peerId;
}
+
public ReplicationPeerConfig getPeerConfig() {
return replicationPeer.getPeerConfig();
}
+
public ReplicationPeer getReplicationPeer() {
return replicationPeer;
}
+
public MetricsSource getMetrics() {
return metrics;
}
+
public TableDescriptors getTableDescriptors() {
return tableDescriptors;
}
- public Abortable getAbortable() { return abortable; }
+
+ public Abortable getAbortable() {
+ return abortable;
+ }
}
/**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
index f7721e0..65cf9a8 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java
@@ -19,67 +19,47 @@
package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
-import java.io.InterruptedIOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.Callable;
+import java.util.Optional;
+import java.util.TreeMap;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.CellScanner;
import org.apache.hadoop.hbase.CellUtil;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotFoundException;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.RegionAdminServiceCallable;
-import org.apache.hadoop.hbase.client.RegionInfo;
-import org.apache.hadoop.hbase.client.RegionReplicaUtil;
-import org.apache.hadoop.hbase.client.RetryingCallable;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.ipc.HBaseRpcController;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
-import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
+import org.apache.hadoop.hbase.util.AtomicUtils;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.RetryCounter;
+import org.apache.hadoop.hbase.util.RetryCounterFactory;
import org.apache.hadoop.hbase.wal.WAL.Entry;
-import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers;
-import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink;
-import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
-import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
-import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter;
-import org.apache.hadoop.util.StringUtils;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.cache.Cache;
import org.apache.hbase.thirdparty.com.google.common.cache.CacheBuilder;
-
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
+import org.apache.hbase.thirdparty.com.google.common.cache.CacheLoader;
+import org.apache.hbase.thirdparty.com.google.common.cache.LoadingCache;
/**
- * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint
- * which receives the WAL edits from the WAL, and sends the edits to replicas
- * of regions.
+ * A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint} endpoint which receives the WAL
+ * edits from the WAL, and sends the edits to replicas of regions.
*/
@InterfaceAudience.Private
public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
@@ -87,32 +67,55 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
private static final Logger LOG = LoggerFactory.getLogger(RegionReplicaReplicationEndpoint.class);
// Can be configured differently than hbase.client.retries.number
- private static String CLIENT_RETRIES_NUMBER
- = "hbase.region.replica.replication.client.retries.number";
+ private static String CLIENT_RETRIES_NUMBER =
+ "hbase.region.replica.replication.client.retries.number";
private Configuration conf;
- private ClusterConnection connection;
+ private AsyncClusterConnection connection;
private TableDescriptors tableDescriptors;
- // Reuse WALSplitter constructs as a WAL pipe
- private PipelineController controller;
- private RegionReplicaOutputSink outputSink;
- private EntryBuffers entryBuffers;
+ private int numRetries;
+
+ private long operationTimeoutNs;
- // Number of writer threads
- private int numWriterThreads;
+ private LoadingCache<TableName, Optional<TableDescriptor>> tableDescriptorCache;
- private int operationTimeout;
+ private Cache<TableName, TableName> disabledTableCache;
- private ExecutorService pool;
+ private final RetryCounterFactory retryCounterFactory =
+ new RetryCounterFactory(Integer.MAX_VALUE, 1000, 60000);
@Override
public void init(Context context) throws IOException {
super.init(context);
-
- this.conf = HBaseConfiguration.create(context.getConfiguration());
+ this.conf = context.getConfiguration();
this.tableDescriptors = context.getTableDescriptors();
-
+ int memstoreReplicationEnabledCacheExpiryMs = conf
+ .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
+ // A cache for the table "memstore replication enabled" flag.
+ // It has a default expiry of 5 sec. This means that if the table is altered
+ // with a different flag value, we might miss to replicate for that amount of
+ // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
+ tableDescriptorCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
+ .initialCapacity(10).maximumSize(1000)
+ .build(new CacheLoader<TableName, Optional<TableDescriptor>>() {
+
+ @Override
+ public Optional<TableDescriptor> load(TableName tableName) throws Exception {
+ // check if the table requires memstore replication
+ // some unit-test drop the table, so we should do a bypass check and always replicate.
+ return Optional.ofNullable(tableDescriptors.get(tableName));
+ }
+ });
+ int nonExistentTableCacheExpiryMs =
+ conf.getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
+ // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
+ // table is created again with the same name, we might miss to replicate for that amount of
+ // time. But this cache prevents overloading meta requests for every edit from a deleted file.
+ disabledTableCache = CacheBuilder.newBuilder()
+ .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS).initialCapacity(10)
+ .maximumSize(1000).build();
// HRS multiplies client retries by 10 globally for meta operations, but we do not want this.
// We are resetting it here because we want default number of retries (35) rather than 10 times
// that which makes very long retries for disabled tables etc.
@@ -123,516 +126,261 @@ public class RegionReplicaReplicationEndpoint extends HBaseReplicationEndpoint {
HConstants.DEFAULT_HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER);
defaultNumRetries = defaultNumRetries / mult; // reset if HRS has multiplied this already
}
-
- conf.setInt(HConstants.HBASE_CLIENT_SERVERSIDE_RETRIES_MULTIPLIER, 1);
- int numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
- conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, numRetries);
-
- this.numWriterThreads = this.conf.getInt(
- "hbase.region.replica.replication.writer.threads", 3);
- controller = new PipelineController();
- entryBuffers = new EntryBuffers(controller,
- this.conf.getLong("hbase.region.replica.replication.buffersize", 128 * 1024 * 1024));
-
+ this.numRetries = conf.getInt(CLIENT_RETRIES_NUMBER, defaultNumRetries);
// use the regular RPC timeout for replica replication RPC's
- this.operationTimeout = conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
- HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT);
- }
-
- @Override
- protected void doStart() {
- try {
- connection = (ClusterConnection) ConnectionFactory.createConnection(this.conf);
- this.pool = getDefaultThreadPool(conf);
- outputSink = new RegionReplicaOutputSink(controller, tableDescriptors, entryBuffers,
- connection, pool, numWriterThreads, operationTimeout);
- outputSink.startWriterThreads();
- super.doStart();
- } catch (IOException ex) {
- LOG.warn("Received exception while creating connection :" + ex);
- notifyFailed(ex);
- }
- }
-
- @Override
- protected void doStop() {
- if (outputSink != null) {
- try {
- outputSink.finishWritingAndClose();
- } catch (IOException ex) {
- LOG.warn("Got exception while trying to close OutputSink", ex);
- }
- }
- if (this.pool != null) {
- this.pool.shutdownNow();
- try {
- // wait for 10 sec
- boolean shutdown = this.pool.awaitTermination(10000, TimeUnit.MILLISECONDS);
- if (!shutdown) {
- LOG.warn("Failed to shutdown the thread pool after 10 seconds");
- }
- } catch (InterruptedException e) {
- LOG.warn("Got interrupted while waiting for the thread pool to shut down" + e);
- }
- }
- if (connection != null) {
- try {
- connection.close();
- } catch (IOException ex) {
- LOG.warn("Got exception closing connection :" + ex);
- }
- }
- super.doStop();
+ this.operationTimeoutNs =
+ TimeUnit.MILLISECONDS.toNanos(conf.getInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT,
+ HConstants.DEFAULT_HBASE_CLIENT_OPERATION_TIMEOUT));
+ this.connection = context.getServer().getAsyncClusterConnection();
}
/**
- * Returns a Thread pool for the RPC's to region replicas. Similar to
- * Connection's thread pool.
+ * returns true if the specified entry must be replicated. We should always replicate meta
+ * operations (e.g. flush) and use the user HTD flag to decide whether or not replicate the
+ * memstore.
*/
- private ExecutorService getDefaultThreadPool(Configuration conf) {
- int maxThreads = conf.getInt("hbase.region.replica.replication.threads.max", 256);
- if (maxThreads == 0) {
- maxThreads = Runtime.getRuntime().availableProcessors() * 8;
+ private boolean requiresReplication(Optional<TableDescriptor> tableDesc, Entry entry) {
+ // empty edit does not need to be replicated
+ if (entry.getEdit().isEmpty() || !tableDesc.isPresent()) {
+ return false;
}
- long keepAliveTime = conf.getLong("hbase.region.replica.replication.threads.keepalivetime", 60);
- LinkedBlockingQueue<Runnable> workQueue =
- new LinkedBlockingQueue<>(maxThreads *
- conf.getInt(HConstants.HBASE_CLIENT_MAX_TOTAL_TASKS,
- HConstants.DEFAULT_HBASE_CLIENT_MAX_TOTAL_TASKS));
- ThreadPoolExecutor tpe = new ThreadPoolExecutor(
- maxThreads,
- maxThreads,
- keepAliveTime,
- TimeUnit.SECONDS,
- workQueue,
- Threads.newDaemonThreadFactory(this.getClass().getSimpleName() + "-rpc-shared-"));
- tpe.allowCoreThreadTimeOut(true);
- return tpe;
+ // meta edits (e.g. flush) must be always replicated
+ return entry.getEdit().isMetaEdit() || tableDesc.get().hasRegionMemStoreReplication();
}
- @Override
- public boolean replicate(ReplicateContext replicateContext) {
- /* A note on batching in RegionReplicaReplicationEndpoint (RRRE):
- *
- * RRRE relies on batching from two different mechanisms. The first is the batching from
- * ReplicationSource since RRRE is a ReplicationEndpoint driven by RS. RS reads from a single
- * WAL file filling up a buffer of heap size "replication.source.size.capacity"(64MB) or at most
- * "replication.source.nb.capacity" entries or until it sees the end of file (in live tailing).
- * Then RS passes all the buffered edits in this replicate() call context. RRRE puts the edits
- * to the WALSplitter.EntryBuffers which is a blocking buffer space of up to
- * "hbase.region.replica.replication.buffersize" (128MB) in size. This buffer splits the edits
- * based on regions.
- *
- * There are "hbase.region.replica.replication.writer.threads"(default 3) writer threads which
- * pick largest per-region buffer and send it to the SinkWriter (see RegionReplicaOutputSink).
- * The SinkWriter in this case will send the wal edits to all secondary region replicas in
- * parallel via a retrying rpc call. EntryBuffers guarantees that while a buffer is
- * being written to the sink, another buffer for the same region will not be made available to
- * writers ensuring regions edits are not replayed out of order.
- *
- * The replicate() call won't return until all the buffers are sent and ack'd by the sinks so
- * that the replication can assume all edits are persisted. We may be able to do a better
- * pipelining between the replication thread and output sinks later if it becomes a bottleneck.
- */
-
- while (this.isRunning()) {
- try {
- for (Entry entry: replicateContext.getEntries()) {
- entryBuffers.appendEntry(entry);
+ private void getRegionLocations(CompletableFuture<RegionLocations> future,
+ TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, boolean reload) {
+ FutureUtils.addListener(connection.getRegionLocations(tableDesc.getTableName(), row, reload),
+ (r, e) -> {
+ if (e != null) {
+ future.completeExceptionally(e);
+ return;
}
- outputSink.flush(); // make sure everything is flushed
- ctx.getMetrics().incrLogEditsFiltered(
- outputSink.getSkippedEditsCounter().getAndSet(0));
- return true;
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- return false;
- } catch (IOException e) {
- LOG.warn("Received IOException while trying to replicate"
- + StringUtils.stringifyException(e));
- }
- }
-
- return false;
- }
-
- @Override
- public boolean canReplicateToSameCluster() {
- return true;
- }
-
- @Override
- protected WALEntryFilter getScopeWALEntryFilter() {
- // we do not care about scope. We replicate everything.
- return null;
+ // if we are not loading from cache, just return
+ if (reload) {
+ future.complete(r);
+ return;
+ }
+ // check if the number of region replicas is correct, and also the primary region name
+ // matches
+ if (r.size() == tableDesc.getRegionReplication() && Bytes.equals(
+ r.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(), encodedRegionName)) {
+ future.complete(r);
+ } else {
+ // reload again as the information in cache maybe stale
+ getRegionLocations(future, tableDesc, encodedRegionName, row, true);
+ }
+ });
}
- static class RegionReplicaOutputSink extends OutputSink {
- private final RegionReplicaSinkWriter sinkWriter;
- private final TableDescriptors tableDescriptors;
- private final Cache<TableName, Boolean> memstoreReplicationEnabled;
-
- public RegionReplicaOutputSink(PipelineController controller, TableDescriptors tableDescriptors,
- EntryBuffers entryBuffers, ClusterConnection connection, ExecutorService pool,
- int numWriters, int operationTimeout) {
- super(controller, entryBuffers, numWriters);
- this.sinkWriter =
- new RegionReplicaSinkWriter(this, connection, pool, operationTimeout, tableDescriptors);
- this.tableDescriptors = tableDescriptors;
-
- // A cache for the table "memstore replication enabled" flag.
- // It has a default expiry of 5 sec. This means that if the table is altered
- // with a different flag value, we might miss to replicate for that amount of
- // time. But this cache avoid the slow lookup and parsing of the TableDescriptor.
- int memstoreReplicationEnabledCacheExpiryMs = connection.getConfiguration()
- .getInt("hbase.region.replica.replication.cache.memstoreReplicationEnabled.expiryMs", 5000);
- this.memstoreReplicationEnabled = CacheBuilder.newBuilder()
- .expireAfterWrite(memstoreReplicationEnabledCacheExpiryMs, TimeUnit.MILLISECONDS)
- .initialCapacity(10)
- .maximumSize(1000)
- .build();
+ private void replicate(CompletableFuture<Long> future, RegionLocations locs,
+ TableDescriptor tableDesc, byte[] encodedRegionName, byte[] row, List<Entry> entries) {
+ if (locs.size() == 1) {
+ // Could this happen?
+ future.complete(Long.valueOf(entries.size()));
+ return;
}
-
- @Override
- public void append(RegionEntryBuffer buffer) throws IOException {
- List<Entry> entries = buffer.getEntryBuffer();
-
- if (entries.isEmpty() || entries.get(0).getEdit().getCells().isEmpty()) {
- return;
- }
-
- // meta edits (e.g. flush) are always replicated.
- // data edits (e.g. put) are replicated if the table requires them.
- if (!requiresReplication(buffer.getTableName(), entries)) {
- return;
+ if (!Bytes.equals(locs.getDefaultRegionLocation().getRegion().getEncodedNameAsBytes(),
+ encodedRegionName)) {
+ // the region name is not equal, this usually means the region has been split or merged, so
+ // give up replicating as the new region(s) should already have all the data of the parent
+ // region(s).
+ if (LOG.isTraceEnabled()) {
+ LOG.trace(
+ "Skipping {} entries in table {} because located region {} is different than" +
+ " the original region {} from WALEdit",
+ tableDesc.getTableName(), locs.getDefaultRegionLocation().getRegion().getEncodedName(),
+ Bytes.toStringBinary(encodedRegionName));
}
-
- sinkWriter.append(buffer.getTableName(), buffer.getEncodedRegionName(),
- CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0)), entries);
- }
-
- @Override
- public boolean flush() throws IOException {
- // nothing much to do for now. Wait for the Writer threads to finish up
- // append()'ing the data.
- entryBuffers.waitUntilDrained();
- return super.flush();
- }
-
- @Override
- public boolean keepRegionEvent(Entry entry) {
- return true;
- }
-
- @Override
- public List<Path> finishWritingAndClose() throws IOException {
- finishWriting(true);
- return null;
- }
-
- @Override
- public Map<byte[], Long> getOutputCounts() {
- return null; // only used in tests
- }
-
- @Override
- public int getNumberOfRecoveredRegions() {
- return 0;
- }
-
- AtomicLong getSkippedEditsCounter() {
- return skippedEdits;
+ future.complete(Long.valueOf(entries.size()));
+ return;
}
-
- /**
- * returns true if the specified entry must be replicated.
- * We should always replicate meta operations (e.g. flush)
- * and use the user HTD flag to decide whether or not replicate the memstore.
- */
- private boolean requiresReplication(final TableName tableName, final List<Entry> entries)
- throws IOException {
- // unit-tests may not the TableDescriptors, bypass the check and always replicate
- if (tableDescriptors == null) return true;
-
- Boolean requiresReplication = memstoreReplicationEnabled.getIfPresent(tableName);
- if (requiresReplication == null) {
- // check if the table requires memstore replication
- // some unit-test drop the table, so we should do a bypass check and always replicate.
- TableDescriptor htd = tableDescriptors.get(tableName);
- requiresReplication = htd == null || htd.hasRegionMemStoreReplication();
- memstoreReplicationEnabled.put(tableName, requiresReplication);
- }
-
- // if memstore replication is not required, check the entries.
- // meta edits (e.g. flush) must be always replicated.
- if (!requiresReplication) {
- int skipEdits = 0;
- java.util.Iterator<Entry> it = entries.iterator();
- while (it.hasNext()) {
- Entry entry = it.next();
- if (entry.getEdit().isMetaEdit()) {
- requiresReplication = true;
+ AtomicReference<Throwable> error = new AtomicReference<>();
+ AtomicInteger remainingTasks = new AtomicInteger(locs.size() - 1);
+ AtomicLong skippedEdits = new AtomicLong(0);
+
+ for (int i = 1, n = locs.size(); i < n; i++) {
+ final int replicaId = i;
+ FutureUtils.addListener(connection.replay(tableDesc.getTableName(),
+ locs.getRegionLocation(replicaId).getRegion().getEncodedNameAsBytes(), row, entries,
+ replicaId, numRetries, operationTimeoutNs), (r, e) -> {
+ if (e != null) {
+ LOG.warn("Failed to replicate to {}", locs.getRegionLocation(replicaId), e);
+ error.compareAndSet(null, e);
} else {
- it.remove();
- skipEdits++;
+ AtomicUtils.updateMax(skippedEdits, r.longValue());
}
- }
- skippedEdits.addAndGet(skipEdits);
- }
- return requiresReplication;
+ if (remainingTasks.decrementAndGet() == 0) {
+ if (error.get() != null) {
+ future.completeExceptionally(error.get());
+ } else {
+ future.complete(skippedEdits.get());
+ }
+ }
+ });
}
}
- static class RegionReplicaSinkWriter extends SinkWriter {
- RegionReplicaOutputSink sink;
- ClusterConnection connection;
- RpcControllerFactory rpcControllerFactory;
- RpcRetryingCallerFactory rpcRetryingCallerFactory;
- int operationTimeout;
- ExecutorService pool;
- Cache<TableName, Boolean> disabledAndDroppedTables;
- TableDescriptors tableDescriptors;
-
- public RegionReplicaSinkWriter(RegionReplicaOutputSink sink, ClusterConnection connection,
- ExecutorService pool, int operationTimeout, TableDescriptors tableDescriptors) {
- this.sink = sink;
- this.connection = connection;
- this.operationTimeout = operationTimeout;
- this.rpcRetryingCallerFactory
- = RpcRetryingCallerFactory.instantiate(connection.getConfiguration());
- this.rpcControllerFactory = RpcControllerFactory.instantiate(connection.getConfiguration());
- this.pool = pool;
- this.tableDescriptors = tableDescriptors;
-
- int nonExistentTableCacheExpiryMs = connection.getConfiguration()
- .getInt("hbase.region.replica.replication.cache.disabledAndDroppedTables.expiryMs", 5000);
- // A cache for non existing tables that have a default expiry of 5 sec. This means that if the
- // table is created again with the same name, we might miss to replicate for that amount of
- // time. But this cache prevents overloading meta requests for every edit from a deleted file.
- disabledAndDroppedTables = CacheBuilder.newBuilder()
- .expireAfterWrite(nonExistentTableCacheExpiryMs, TimeUnit.MILLISECONDS)
- .initialCapacity(10)
- .maximumSize(1000)
- .build();
+ private void logSkipped(TableName tableName, List<Entry> entries, String reason) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Skipping {} entries because table {} is {}", entries.size(), tableName, reason);
+ for (Entry entry : entries) {
+ LOG.trace("Skipping : {}", entry);
+ }
}
+ }
- public void append(TableName tableName, byte[] encodedRegionName, byte[] row,
- List<Entry> entries) throws IOException {
-
- if (disabledAndDroppedTables.getIfPresent(tableName) != null) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
- + " is cached as a disabled or dropped table");
- for (Entry entry : entries) {
- LOG.trace("Skipping : " + entry);
- }
- }
- sink.getSkippedEditsCounter().addAndGet(entries.size());
- return;
+ private CompletableFuture<Long> replicate(TableDescriptor tableDesc, byte[] encodedRegionName,
+ List<Entry> entries) {
+ if (disabledTableCache.getIfPresent(tableDesc.getTableName()) != null) {
+ logSkipped(tableDesc.getTableName(), entries, "cached as a disabled table");
+ return CompletableFuture.completedFuture(Long.valueOf(entries.size()));
+ }
+ byte[] row = CellUtil.cloneRow(entries.get(0).getEdit().getCells().get(0));
+ CompletableFuture<RegionLocations> locateFuture = new CompletableFuture<>();
+ getRegionLocations(locateFuture, tableDesc, encodedRegionName, row, false);
+ CompletableFuture<Long> future = new CompletableFuture<>();
+ FutureUtils.addListener(locateFuture, (locs, error) -> {
+ if (error != null) {
+ future.completeExceptionally(error);
+ } else {
+ replicate(future, locs, tableDesc, encodedRegionName, row, entries);
}
+ });
+ return future;
+ }
- // If the table is disabled or dropped, we should not replay the entries, and we can skip
- // replaying them. However, we might not know whether the table is disabled until we
- // invalidate the cache and check from meta
- RegionLocations locations = null;
- boolean useCache = true;
- while (true) {
- // get the replicas of the primary region
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ Map<byte[], Pair<TableDescriptor, List<Entry>>> encodedRegionName2Entries =
+ new TreeMap<>(Bytes.BYTES_COMPARATOR);
+ long skippedEdits = 0;
+ RetryCounter retryCounter = retryCounterFactory.create();
+ outer: while (isRunning()) {
+ encodedRegionName2Entries.clear();
+ skippedEdits = 0;
+ for (Entry entry : replicateContext.getEntries()) {
+ Optional<TableDescriptor> tableDesc;
try {
- locations = RegionReplicaReplayCallable
- .getRegionLocations(connection, tableName, row, useCache, 0);
-
- if (locations == null) {
- throw new HBaseIOException("Cannot locate locations for "
- + tableName + ", row:" + Bytes.toStringBinary(row));
+ tableDesc = tableDescriptorCache.get(entry.getKey().getTableName());
+ } catch (ExecutionException e) {
+ LOG.warn("Failed to load table descriptor for {}, attempts={}",
+ entry.getKey().getTableName(), retryCounter.getAttemptTimes(), e.getCause());
+ if (!retryCounter.shouldRetry()) {
+ return false;
}
- } catch (TableNotFoundException e) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skipping " + entries.size() + " entries because table " + tableName
- + " is dropped. Adding table to cache.");
- for (Entry entry : entries) {
- LOG.trace("Skipping : " + entry);
- }
+ try {
+ retryCounter.sleepUntilNextRetry();
+ } catch (InterruptedException e1) {
+ // restore the interrupted state
+ Thread.currentThread().interrupt();
+ return false;
}
- disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache. Value ignored
- // skip this entry
- sink.getSkippedEditsCounter().addAndGet(entries.size());
- return;
+ continue outer;
}
-
- // check whether we should still replay this entry. If the regions are changed, or the
- // entry is not coming from the primary region, filter it out.
- HRegionLocation primaryLocation = locations.getDefaultRegionLocation();
- if (!Bytes.equals(primaryLocation.getRegionInfo().getEncodedNameAsBytes(),
- encodedRegionName)) {
- if (useCache) {
- useCache = false;
- continue; // this will retry location lookup
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
- + " because located region " + primaryLocation.getRegionInfo().getEncodedName()
- + " is different than the original region " + Bytes.toStringBinary(encodedRegionName)
- + " from WALEdit");
- for (Entry entry : entries) {
- LOG.trace("Skipping : " + entry);
- }
- }
- sink.getSkippedEditsCounter().addAndGet(entries.size());
- return;
+ if (!requiresReplication(tableDesc, entry)) {
+ skippedEdits++;
+ continue;
}
- break;
+ byte[] encodedRegionName = entry.getKey().getEncodedRegionName();
+ encodedRegionName2Entries
+ .computeIfAbsent(encodedRegionName, k -> Pair.newPair(tableDesc.get(), new ArrayList<>()))
+ .getSecond().add(entry);
}
-
- if (locations.size() == 1) {
- return;
- }
-
- ArrayList<Future<ReplicateWALEntryResponse>> tasks = new ArrayList<>(locations.size() - 1);
-
- // All passed entries should belong to one region because it is coming from the EntryBuffers
- // split per region. But the regions might split and merge (unlike log recovery case).
- for (int replicaId = 0; replicaId < locations.size(); replicaId++) {
- HRegionLocation location = locations.getRegionLocation(replicaId);
- if (!RegionReplicaUtil.isDefaultReplica(replicaId)) {
- RegionInfo regionInfo = location == null
- ? RegionReplicaUtil.getRegionInfoForReplica(
- locations.getDefaultRegionLocation().getRegionInfo(), replicaId)
- : location.getRegionInfo();
- RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
- rpcControllerFactory, tableName, location, regionInfo, row, entries,
- sink.getSkippedEditsCounter());
- Future<ReplicateWALEntryResponse> task = pool.submit(
- new RetryingRpcCallable<>(rpcRetryingCallerFactory, callable, operationTimeout));
- tasks.add(task);
- }
+ break;
+ }
+ // send the request to regions
+ retryCounter = retryCounterFactory.create();
+ while (isRunning()) {
+ List<Pair<CompletableFuture<Long>, byte[]>> futureAndEncodedRegionNameList =
+ new ArrayList<Pair<CompletableFuture<Long>, byte[]>>();
+ for (Map.Entry<byte[], Pair<TableDescriptor, List<Entry>>> entry : encodedRegionName2Entries
+ .entrySet()) {
+ CompletableFuture<Long> future =
+ replicate(entry.getValue().getFirst(), entry.getKey(), entry.getValue().getSecond());
+ futureAndEncodedRegionNameList.add(Pair.newPair(future, entry.getKey()));
}
-
- boolean tasksCancelled = false;
- for (int replicaId = 0; replicaId < tasks.size(); replicaId++) {
+ for (Pair<CompletableFuture<Long>, byte[]> pair : futureAndEncodedRegionNameList) {
+ byte[] encodedRegionName = pair.getSecond();
try {
- tasks.get(replicaId).get();
+ skippedEdits += pair.getFirst().get().longValue();
+ encodedRegionName2Entries.remove(encodedRegionName);
} catch (InterruptedException e) {
- throw new InterruptedIOException(e.getMessage());
+ // restore the interrupted state
+ Thread.currentThread().interrupt();
+ return false;
} catch (ExecutionException e) {
+ Pair<TableDescriptor, List<Entry>> tableAndEntries =
+ encodedRegionName2Entries.get(encodedRegionName);
+ TableName tableName = tableAndEntries.getFirst().getTableName();
+ List<Entry> entries = tableAndEntries.getSecond();
Throwable cause = e.getCause();
- boolean canBeSkipped = false;
- if (cause instanceof IOException) {
- // The table can be disabled or dropped at this time. For disabled tables, we have no
- // cheap mechanism to detect this case because meta does not contain this information.
- // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
- // RPC. So instead we start the replay RPC with retries and check whether the table is
- // dropped or disabled which might cause SocketTimeoutException, or
- // RetriesExhaustedException or similar if we get IOE.
- if (cause instanceof TableNotFoundException
- || connection.isTableDisabled(tableName)) {
- disabledAndDroppedTables.put(tableName, Boolean.TRUE); // put to cache for later.
- canBeSkipped = true;
- } else if (tableDescriptors != null) {
- TableDescriptor tableDescriptor = tableDescriptors.get(tableName);
- if (tableDescriptor != null
- //(replicaId + 1) as no task is added for primary replica for replication
- && tableDescriptor.getRegionReplication() <= (replicaId + 1)) {
- canBeSkipped = true;
- }
- }
- if (canBeSkipped) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
- + " because received exception for dropped or disabled table",
- cause);
- for (Entry entry : entries) {
- LOG.trace("Skipping : " + entry);
- }
- }
- if (!tasksCancelled) {
- sink.getSkippedEditsCounter().addAndGet(entries.size());
- tasksCancelled = true; // so that we do not add to skipped counter again
- }
- continue;
- }
-
- // otherwise rethrow
- throw (IOException)cause;
+ // The table can be disabled or dropped at this time. For disabled tables, we have no
+ // cheap mechanism to detect this case because meta does not contain this information.
+ // ClusterConnection.isTableDisabled() is a zk call which we cannot do for every replay
+ // RPC. So instead we start the replay RPC with retries and check whether the table is
+ // dropped or disabled which might cause SocketTimeoutException, or
+ // RetriesExhaustedException or similar if we get IOE.
+ if (cause instanceof TableNotFoundException) {
+ // add to cache that the table does not exist
+ tableDescriptorCache.put(tableName, Optional.empty());
+ logSkipped(tableName, entries, "dropped");
+ skippedEdits += entries.size();
+ encodedRegionName2Entries.remove(encodedRegionName);
+ continue;
+ }
+ boolean disabled = false;
+ try {
+ disabled = connection.getAdmin().isTableDisabled(tableName).get();
+ } catch (InterruptedException e1) {
+ // restore the interrupted state
+ Thread.currentThread().interrupt();
+ return false;
+ } catch (ExecutionException e1) {
+ LOG.warn("Failed to test whether {} is disabled, assume it is not disabled", tableName,
+ e1.getCause());
+ }
+ if (disabled) {
+ disabledTableCache.put(tableName, tableName);
+ logSkipped(tableName, entries, "disabled");
+ skippedEdits += entries.size();
+ encodedRegionName2Entries.remove(encodedRegionName);
+ continue;
}
- // unexpected exception
- throw new IOException(cause);
+ LOG.warn("Failed to replicate {} entries for region {} of table {}", entries.size(),
+ Bytes.toStringBinary(encodedRegionName), tableName);
+ }
+ }
+ // we have done
+ if (encodedRegionName2Entries.isEmpty()) {
+ ctx.getMetrics().incrLogEditsFiltered(skippedEdits);
+ return true;
+ } else {
+ LOG.warn("Failed to replicate all entris, retry={}", retryCounter.getAttemptTimes());
+ if (!retryCounter.shouldRetry()) {
+ return false;
+ }
+ try {
+ retryCounter.sleepUntilNextRetry();
+ } catch (InterruptedException e) {
+ // restore the interrupted state
+ Thread.currentThread().interrupt();
+ return false;
}
}
}
- }
- static class RetryingRpcCallable<V> implements Callable<V> {
- RpcRetryingCallerFactory factory;
- RetryingCallable<V> callable;
- int timeout;
- public RetryingRpcCallable(RpcRetryingCallerFactory factory, RetryingCallable<V> callable,
- int timeout) {
- this.factory = factory;
- this.callable = callable;
- this.timeout = timeout;
- }
- @Override
- public V call() throws Exception {
- return factory.<V>newCaller().callWithRetries(callable, timeout);
- }
+ return false;
}
- /**
- * Calls replay on the passed edits for the given set of entries belonging to the region. It skips
- * the entry if the region boundaries have changed or the region is gone.
- */
- static class RegionReplicaReplayCallable extends
- RegionAdminServiceCallable<ReplicateWALEntryResponse> {
- private final List<Entry> entries;
- private final byte[] initialEncodedRegionName;
- private final AtomicLong skippedEntries;
-
- public RegionReplicaReplayCallable(ClusterConnection connection,
- RpcControllerFactory rpcControllerFactory, TableName tableName,
- HRegionLocation location, RegionInfo regionInfo, byte[] row,List<Entry> entries,
- AtomicLong skippedEntries) {
- super(connection, rpcControllerFactory, location, tableName, row, regionInfo.getReplicaId());
- this.entries = entries;
- this.skippedEntries = skippedEntries;
- this.initialEncodedRegionName = regionInfo.getEncodedNameAsBytes();
- }
-
- @Override
- public ReplicateWALEntryResponse call(HBaseRpcController controller) throws Exception {
- // Check whether we should still replay this entry. If the regions are changed, or the
- // entry is not coming form the primary region, filter it out because we do not need it.
- // Regions can change because of (1) region split (2) region merge (3) table recreated
- boolean skip = false;
- if (!Bytes.equals(location.getRegionInfo().getEncodedNameAsBytes(),
- initialEncodedRegionName)) {
- skip = true;
- }
- if (!this.entries.isEmpty() && !skip) {
- Entry[] entriesArray = new Entry[this.entries.size()];
- entriesArray = this.entries.toArray(entriesArray);
-
- // set the region name for the target region replica
- Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
- ReplicationProtbufUtil.buildReplicateWALEntryRequest(entriesArray, location
- .getRegionInfo().getEncodedNameAsBytes(), null, null, null);
- controller.setCellScanner(p.getSecond());
- return stub.replay(controller, p.getFirst());
- }
+ @Override
+ public boolean canReplicateToSameCluster() {
+ return true;
+ }
- if (skip) {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Skipping " + entries.size() + " entries in table " + tableName
- + " because located region " + location.getRegionInfo().getEncodedName()
- + " is different than the original region "
- + Bytes.toStringBinary(initialEncodedRegionName) + " from WALEdit");
- for (Entry entry : entries) {
- LOG.trace("Skipping : " + entry);
- }
- }
- skippedEntries.addAndGet(entries.size());
- }
- return ReplicateWALEntryResponse.newBuilder().build();
- }
+ @Override
+ protected WALEntryFilter getScopeWALEntryFilter() {
+ // we do not care about scope. We replicate everything.
+ return null;
}
}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 8e001e6..b58fce3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -283,7 +283,7 @@ public class ReplicationSource implements ReplicationSourceInterface {
tableDescriptors = ((HRegionServer) server).getTableDescriptors();
}
replicationEndpoint
- .init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(), fs,
+ .init(new ReplicationEndpoint.Context(server, conf, replicationPeer.getConfiguration(), fs,
replicationPeer.getId(), clusterId, replicationPeer, metrics, tableDescriptors, server));
replicationEndpoint.start();
replicationEndpoint.awaitRunning(waitOnEndpointSeconds, TimeUnit.SECONDS);
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
index 55da3f4..652d1d6 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpoint.java
@@ -20,16 +20,17 @@ package org.apache.hadoop.hbase.replication.regionserver;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.UUID;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.Cell.Type;
import org.apache.hadoop.hbase.CellBuilderFactory;
@@ -42,7 +43,6 @@ import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RegionLocator;
@@ -51,12 +51,12 @@ import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
+import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.testclassification.FlakeyTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.FSTableDescriptors;
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.wal.WALEdit;
@@ -383,9 +383,8 @@ public class TestRegionReplicaReplicationEndpoint {
testRegionReplicaReplicationIgnores(false, true);
}
- public void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
+ private void testRegionReplicaReplicationIgnores(boolean dropTable, boolean disableReplication)
throws Exception {
-
// tests having edits from a disabled or dropped table is handled correctly by skipping those
// entries and further edits after the edits from dropped/disabled table can be replicated
// without problems.
@@ -405,8 +404,7 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.getAdmin().createTable(htd);
// both tables are created, now pause replication
- ReplicationAdmin admin = new ReplicationAdmin(HTU.getConfiguration());
- admin.disablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+ HTU.getAdmin().disableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
// now that the replication is disabled, write to the table to be dropped, then drop the table.
@@ -416,19 +414,9 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.loadNumericRows(tableToBeDisabled, HBaseTestingUtility.fam1, 6000, 7000);
- AtomicLong skippedEdits = new AtomicLong();
- RegionReplicaReplicationEndpoint.RegionReplicaOutputSink sink =
- mock(RegionReplicaReplicationEndpoint.RegionReplicaOutputSink.class);
- when(sink.getSkippedEditsCounter()).thenReturn(skippedEdits);
- FSTableDescriptors fstd = new FSTableDescriptors(HTU.getConfiguration(),
- FileSystem.get(HTU.getConfiguration()), HTU.getDefaultRootDirPath());
- RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter sinkWriter =
- new RegionReplicaReplicationEndpoint.RegionReplicaSinkWriter(sink,
- (ClusterConnection) connection, Executors.newSingleThreadExecutor(), Integer.MAX_VALUE,
- fstd);
RegionLocator rl = connection.getRegionLocator(toBeDisabledTable);
HRegionLocation hrl = rl.getRegionLocation(HConstants.EMPTY_BYTE_ARRAY);
- byte[] encodedRegionName = hrl.getRegionInfo().getEncodedNameAsBytes();
+ byte[] encodedRegionName = hrl.getRegion().getEncodedNameAsBytes();
Cell cell = CellBuilderFactory.create(CellBuilderType.DEEP_COPY).setRow(Bytes.toBytes("A"))
.setFamily(HTU.fam1).setValue(Bytes.toBytes("VAL")).setType(Type.Put).build();
@@ -436,7 +424,6 @@ public class TestRegionReplicaReplicationEndpoint {
new WALKeyImpl(encodedRegionName, toBeDisabledTable, 1),
new WALEdit()
.add(cell));
-
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
if (dropTable) {
HTU.getAdmin().deleteTable(toBeDisabledTable);
@@ -445,11 +432,23 @@ public class TestRegionReplicaReplicationEndpoint {
HTU.getAdmin().modifyTable(htd);
HTU.getAdmin().enableTable(toBeDisabledTable);
}
- sinkWriter.append(toBeDisabledTable, encodedRegionName,
- HConstants.EMPTY_BYTE_ARRAY, Lists.newArrayList(entry, entry));
-
- assertEquals(2, skippedEdits.get());
+ HRegionServer rs = HTU.getMiniHBaseCluster().getRegionServer(0);
+ MetricsSource metrics = mock(MetricsSource.class);
+ ReplicationEndpoint.Context ctx =
+ new ReplicationEndpoint.Context(rs, HTU.getConfiguration(), HTU.getConfiguration(),
+ HTU.getTestFileSystem(), ServerRegionReplicaUtil.getReplicationPeerId(),
+ UUID.fromString(rs.getClusterId()), rs.getReplicationSourceService().getReplicationPeers()
+ .getPeer(ServerRegionReplicaUtil.getReplicationPeerId()),
+ metrics, rs.getTableDescriptors(), rs);
+ RegionReplicaReplicationEndpoint rrpe = new RegionReplicaReplicationEndpoint();
+ rrpe.init(ctx);
+ rrpe.start();
+ ReplicationEndpoint.ReplicateContext repCtx = new ReplicationEndpoint.ReplicateContext();
+ repCtx.setEntries(Lists.newArrayList(entry, entry));
+ assertTrue(rrpe.replicate(repCtx));
+ verify(metrics, times(1)).incrLogEditsFiltered(eq(2L));
+ rrpe.stop();
if (disableReplication) {
// enable replication again so that we can verify replication
HTU.getAdmin().disableTable(toBeDisabledTable); // disable the table
@@ -460,17 +459,14 @@ public class TestRegionReplicaReplicationEndpoint {
try {
// load some data to the to-be-dropped table
-
// load the data to the table
HTU.loadNumericRows(table, HBaseTestingUtility.fam1, 0, 1000);
// now enable the replication
- admin.enablePeer(ServerRegionReplicaUtil.getReplicationPeerId());
+ HTU.getAdmin().enableReplicationPeer(ServerRegionReplicaUtil.getReplicationPeerId());
verifyReplication(tableName, regionReplication, 0, 1000);
-
} finally {
- admin.close();
table.close();
rl.close();
tableToBeDisabled.close();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
index e91a8bd..0ec7d54 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestRegionReplicaReplicationEndpointNoMaster.java
@@ -23,10 +23,12 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.util.Collections;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CellUtil;
import org.apache.hadoop.hbase.HBaseClassTestRule;
@@ -36,24 +38,22 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.AsyncClusterConnection;
+import org.apache.hadoop.hbase.client.ClusterConnectionFactory;
import org.apache.hadoop.hbase.client.RegionInfo;
import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.RpcRetryingCallerFactory;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessor;
import org.apache.hadoop.hbase.coprocessor.WALCoprocessorEnvironment;
import org.apache.hadoop.hbase.coprocessor.WALObserver;
-import org.apache.hadoop.hbase.ipc.RpcControllerFactory;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.TestRegionServerNoMaster;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationEndpoint.ReplicateContext;
-import org.apache.hadoop.hbase.replication.regionserver.RegionReplicaReplicationEndpoint.RegionReplicaReplayCallable;
+import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.util.Bytes;
@@ -73,8 +73,6 @@ import org.junit.experimental.categories.Category;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.ReplicateWALEntryResponse;
-
/**
* Tests RegionReplicaReplicationEndpoint. Unlike TestRegionReplicaReplicationEndpoint this
* class contains lower level tests using callables.
@@ -178,39 +176,34 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
public void testReplayCallable() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly
openRegion(HTU, rs0, hriSecondary);
- ClusterConnection connection =
- (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
- //load some data to primary
+ // load some data to primary
HTU.loadNumericRows(table, f, 0, 1000);
Assert.assertEquals(1000, entries.size());
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(connection, entries);
+ try (AsyncClusterConnection conn = ClusterConnectionFactory
+ .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(conn, entries);
+ }
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
- connection.close();
}
- private void replicateUsingCallable(ClusterConnection connection, Queue<Entry> entries)
- throws IOException, RuntimeException {
+ private void replicateUsingCallable(AsyncClusterConnection connection, Queue<Entry> entries)
+ throws IOException, ExecutionException, InterruptedException {
Entry entry;
while ((entry = entries.poll()) != null) {
byte[] row = CellUtil.cloneRow(entry.getEdit().getCells().get(0));
- RegionLocations locations = connection.locateRegion(tableName, row, true, true);
- RegionReplicaReplayCallable callable = new RegionReplicaReplayCallable(connection,
- RpcControllerFactory.instantiate(connection.getConfiguration()),
- table.getName(), locations.getRegionLocation(1),
- locations.getRegionLocation(1).getRegionInfo(), row, Lists.newArrayList(entry),
- new AtomicLong());
-
- RpcRetryingCallerFactory factory = RpcRetryingCallerFactory.instantiate(
- connection.getConfiguration());
- factory.<ReplicateWALEntryResponse> newCaller().callWithRetries(callable, 10000);
+ RegionLocations locations = connection.getRegionLocations(tableName, row, true).get();
+ connection
+ .replay(tableName, locations.getRegionLocation(1).getRegion().getEncodedNameAsBytes(), row,
+ Collections.singletonList(entry), 1, Integer.MAX_VALUE, TimeUnit.SECONDS.toNanos(10))
+ .get();
}
}
@@ -218,49 +211,49 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
public void testReplayCallableWithRegionMove() throws Exception {
// tests replaying the edits to a secondary region replica using the Callable directly while
// the region is moved to another location.It tests handling of RME.
- openRegion(HTU, rs0, hriSecondary);
- ClusterConnection connection =
- (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
- //load some data to primary
- HTU.loadNumericRows(table, f, 0, 1000);
+ try (AsyncClusterConnection conn = ClusterConnectionFactory
+ .createAsyncClusterConnection(HTU.getConfiguration(), null, User.getCurrent())) {
+ openRegion(HTU, rs0, hriSecondary);
+ // load some data to primary
+ HTU.loadNumericRows(table, f, 0, 1000);
- Assert.assertEquals(1000, entries.size());
- // replay the edits to the secondary using replay callable
- replicateUsingCallable(connection, entries);
+ Assert.assertEquals(1000, entries.size());
- Region region = rs0.getRegion(hriSecondary.getEncodedName());
- HTU.verifyNumericRows(region, f, 0, 1000);
+ // replay the edits to the secondary using replay callable
+ replicateUsingCallable(conn, entries);
- HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
+ Region region = rs0.getRegion(hriSecondary.getEncodedName());
+ HTU.verifyNumericRows(region, f, 0, 1000);
- // move the secondary region from RS0 to RS1
- closeRegion(HTU, rs0, hriSecondary);
- openRegion(HTU, rs1, hriSecondary);
+ HTU.loadNumericRows(table, f, 1000, 2000); // load some more data to primary
- // replicate the new data
- replicateUsingCallable(connection, entries);
+ // move the secondary region from RS0 to RS1
+ closeRegion(HTU, rs0, hriSecondary);
+ openRegion(HTU, rs1, hriSecondary);
- region = rs1.getRegion(hriSecondary.getEncodedName());
- // verify the new data. old data may or may not be there
- HTU.verifyNumericRows(region, f, 1000, 2000);
+ // replicate the new data
+ replicateUsingCallable(conn, entries);
- HTU.deleteNumericRows(table, f, 0, 2000);
- closeRegion(HTU, rs1, hriSecondary);
- connection.close();
+ region = rs1.getRegion(hriSecondary.getEncodedName());
+ // verify the new data. old data may or may not be there
+ HTU.verifyNumericRows(region, f, 1000, 2000);
+
+ HTU.deleteNumericRows(table, f, 0, 2000);
+ closeRegion(HTU, rs1, hriSecondary);
+ }
}
@Test
public void testRegionReplicaReplicationEndpointReplicate() throws Exception {
// tests replaying the edits to a secondary region replica using the RRRE.replicate()
openRegion(HTU, rs0, hriSecondary);
- ClusterConnection connection =
- (ClusterConnection) ConnectionFactory.createConnection(HTU.getConfiguration());
RegionReplicaReplicationEndpoint replicator = new RegionReplicaReplicationEndpoint();
ReplicationEndpoint.Context context = mock(ReplicationEndpoint.Context.class);
when(context.getConfiguration()).thenReturn(HTU.getConfiguration());
when(context.getMetrics()).thenReturn(mock(MetricsSource.class));
-
+ when(context.getServer()).thenReturn(rs0);
+ when(context.getTableDescriptors()).thenReturn(rs0.getTableDescriptors());
replicator.init(context);
replicator.startAsync();
@@ -272,12 +265,11 @@ public class TestRegionReplicaReplicationEndpointNoMaster {
final String fakeWalGroupId = "fakeWALGroup";
replicator.replicate(new ReplicateContext().setEntries(Lists.newArrayList(entries))
.setWalGroupId(fakeWalGroupId));
-
+ replicator.stop();
Region region = rs0.getRegion(hriSecondary.getEncodedName());
HTU.verifyNumericRows(region, f, 0, 1000);
HTU.deleteNumericRows(table, f, 0, 1000);
closeRegion(HTU, rs0, hriSecondary);
- connection.close();
}
}