You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/05/30 22:01:03 UTC
[1/2] hbase git commit: HBASE-18027
HBaseInterClusterReplicationEndpoint should respect RPC limits when batching
edits
Repository: hbase
Updated Branches:
refs/heads/branch-1 1a37f3be8 -> 140c559a3
refs/heads/master 6846b0394 -> d547feac6
HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d547feac
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d547feac
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d547feac
Branch: refs/heads/master
Commit: d547feac6b673d59703e1a0ef46db38b26046e4c
Parents: 6846b03
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue May 30 14:24:51 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue May 30 14:24:51 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 4 +-
.../hadoop/hbase/regionserver/wal/WALEdit.java | 8 +
.../HBaseInterClusterReplicationEndpoint.java | 130 ++++--
.../org/apache/hadoop/hbase/wal/WALKey.java | 28 +-
.../regionserver/TestReplicator.java | 427 +++++++++++++++++++
5 files changed, 551 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index d553647..0a3db40 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -171,7 +171,7 @@ public abstract class RpcServer implements RpcServerInterface,
protected HBaseRPCErrorHandler errorHandler = null;
- protected static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
+ public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
protected static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
new RequestTooBigException();
@@ -186,7 +186,7 @@ public abstract class RpcServer implements RpcServerInterface,
protected static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
/** Default value for above params */
- protected static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
+ public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
protected static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
protected static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index d5b95ee..0f32a1d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -171,6 +171,14 @@ public class WALEdit implements HeapSize {
return ret;
}
+ public long estimatedSerializedSizeOf() {
+ long ret = 0;
+ for (Cell cell: cells) {
+ ret += CellUtil.estimatedSerializedSizeOf(cell);
+ }
+ return ret;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 97f28b4..30cec5b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -25,7 +25,9 @@ import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@@ -47,6 +49,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -56,7 +59,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException;
-import javax.security.sasl.SaslException;
/**
* A {@link org.apache.hadoop.hbase.replication.ReplicationEndpoint}
@@ -86,6 +88,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private int socketTimeoutMultiplier;
// Amount of time for shutdown to wait for all tasks to complete
private long maxTerminationWait;
+ // Size limit for replication RPCs, in bytes
+ private int replicationRpcLimit;
//Metrics for this source
private MetricsSource metrics;
// Handles connecting to peer region servers
@@ -130,6 +134,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
new LinkedBlockingQueue<>());
this.exec.allowCoreThreadTimeOut(true);
this.abortable = ctx.getAbortable();
+ // Set the size limit for replication RPCs to 95% of the max request size.
+ // We could do with less slop if we have an accurate estimate of encoded size. Being
+ // conservative for now.
+ this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
+ RpcServer.DEFAULT_MAX_REQUEST_SIZE));
this.replicationBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -185,16 +194,46 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return sleepMultiplier < maxRetriesMultiplier;
}
+ private List<List<Entry>> createBatches(final List<Entry> entries) {
+ int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
+ int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+ // Maintains the current batch for a given partition index
+ Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
+ List<List<Entry>> entryLists = new ArrayList<>();
+ int[] sizes = new int[n];
+
+ for (int i = 0; i < n; i++) {
+ entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
+ }
+
+ for (Entry e: entries) {
+ int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
+ int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
+ (int)e.getEdit().estimatedSerializedSizeOf();
+ // If this batch is oversized, add it to final list and initialize a new empty batch
+ if (sizes[index] > 0 /* must include at least one entry */ &&
+ sizes[index] + entrySize > replicationRpcLimit) {
+ entryLists.add(entryMap.get(index));
+ entryMap.put(index, new ArrayList<Entry>());
+ sizes[index] = 0;
+ }
+ entryMap.get(index).add(e);
+ sizes[index] += entrySize;
+ }
+
+ entryLists.addAll(entryMap.values());
+ return entryLists;
+ }
+
/**
* Do the shipping logic
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
CompletionService<Integer> pool = new ExecutorCompletionService<>(this.exec);
- List<Entry> entries = replicateContext.getEntries();
+ List<List<Entry>> batches;
String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1;
- int numReplicated = 0;
if (!peersSelected && this.isRunning()) {
connectToPeers();
@@ -208,22 +247,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return false;
}
- // minimum of: configured threads, number of 100-waledit batches,
- // and number of current sinks
- int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+ batches = createBatches(replicateContext.getEntries());
- List<List<Entry>> entryLists = new ArrayList<>(n);
- if (n == 1) {
- entryLists.add(entries);
- } else {
- for (int i=0; i<n; i++) {
- entryLists.add(new ArrayList<>(entries.size()/n+1));
- }
- // now group by region
- for (Entry e : entries) {
- entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
- }
- }
while (this.isRunning() && !exec.isShutdown()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -232,35 +257,35 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
continue;
}
try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Replicating " + entries.size() +
- " entries of total size " + replicateContext.getSize());
- }
-
int futures = 0;
- for (int i=0; i<entryLists.size(); i++) {
- if (!entryLists.get(i).isEmpty()) {
+ for (int i=0; i<batches.size(); i++) {
+ List<Entry> entries = batches.get(i);
+ if (!entries.isEmpty()) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Submitting " + entryLists.get(i).size() +
+ LOG.trace("Submitting " + entries.size() +
" entries of total size " + replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
- pool.submit(createReplicator(entryLists.get(i), i));
+ pool.submit(createReplicator(entries, i));
futures++;
}
}
IOException iox = null;
+ long lastWriteTime = 0;
for (int i=0; i<futures; i++) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
Future<Integer> f = pool.take();
int index = f.get().intValue();
- int batchSize = entryLists.get(index).size();
- entryLists.set(index, Collections.<Entry>emptyList());
- // Now, we have marked the batch as done replicating, record its size
- numReplicated += batchSize;
+ List<Entry> batch = batches.get(index);
+ batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
+ // Find the most recent write time in the batch
+ long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
+ if (writeTime > lastWriteTime) {
+ lastWriteTime = writeTime;
+ }
} catch (InterruptedException ie) {
iox = new IOException(ie);
} catch (ExecutionException ee) {
@@ -272,15 +297,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// if we had any exceptions, try again
throw iox;
}
- if (numReplicated != entries.size()) {
- // Something went wrong here and we don't know what, let's just fail and retry.
- LOG.warn("The number of edits replicated is different from the number received,"
- + " failing for now.");
- return false;
- }
// update metrics
- this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
- walGroupId);
+ if (lastWriteTime > 0) {
+ this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
+ }
return true;
} catch (IOException ioe) {
@@ -374,17 +394,42 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.ordinal = ordinal;
}
+ protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
+ String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ long size = 0;
+ for (Entry e: entries) {
+ size += e.getKey().estimatedSerializedSizeOf();
+ size += e.getEdit().estimatedSerializedSizeOf();
+ }
+ LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
+ entries.size() + " entries with total size " + size + " bytes to " +
+ replicationClusterId);
+ }
+ try {
+ ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
+ }
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
+ }
+ throw e;
+ }
+ }
+
@Override
public Integer call() throws IOException {
SinkPeer sinkPeer = null;
try {
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
- ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
- replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
replicationSinkMgr.reportSinkSuccess(sinkPeer);
return ordinal;
-
} catch (IOException ioe) {
if (sinkPeer != null) {
replicationSinkMgr.reportBadSink(sinkPeer);
@@ -392,6 +437,5 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
throw ioe;
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index bd03e4d..1d84c4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -618,4 +618,30 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
this.origLogSeqNum = walKey.getOrigSequenceNumber();
}
}
-}
\ No newline at end of file
+
+ public long estimatedSerializedSizeOf() {
+ long size = encodedRegionName != null ? encodedRegionName.length : 0;
+ size += tablename != null ? tablename.toBytes().length : 0;
+ if (clusterIds != null) {
+ size += 16 * clusterIds.size();
+ }
+ if (nonceGroup != HConstants.NO_NONCE) {
+ size += Bytes.SIZEOF_LONG; // nonce group
+ }
+ if (nonce != HConstants.NO_NONCE) {
+ size += Bytes.SIZEOF_LONG; // nonce
+ }
+ if (replicationScope != null) {
+ for (Map.Entry<byte[], Integer> scope: replicationScope.entrySet()) {
+ size += scope.getKey().length;
+ size += Bytes.SIZEOF_INT;
+ }
+ }
+ size += Bytes.SIZEOF_LONG; // sequence number
+ size += Bytes.SIZEOF_LONG; // write time
+ if (origLogSeqNum > 0) {
+ size += Bytes.SIZEOF_LONG; // original sequence number
+ }
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/d547feac/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
new file mode 100644
index 0000000..7da56a3
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -0,0 +1,427 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.*;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.RpcController;
+import org.apache.hadoop.hbase.shaded.com.google.protobuf.ServiceException;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+
+@Category(MediumTests.class)
+public class TestReplicator extends TestReplicationBase {
+
+ static final Log LOG = LogFactory.getLog(TestReplicator.class);
+ static final int NUM_ROWS = 10;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
+ conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
+ TestReplicationBase.setUpBeforeClass();
+ admin.removePeer("2"); // Remove the peer set up for us by base class
+ }
+
+ @Test
+ public void testReplicatorBatching() throws Exception {
+ // Clear the tables
+ truncateTable(utility1, tableName);
+ truncateTable(utility2, tableName);
+
+ // Replace the peer set up for us by the base class with a wrapper for this test
+ admin.addPeer("testReplicatorBatching",
+ new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+
+ ReplicationEndpointForTest.setBatchCount(0);
+ ReplicationEndpointForTest.setEntriesCount(0);
+ try {
+ ReplicationEndpointForTest.pause();
+ try {
+ // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+ // have to be replicated separately.
+ final byte[] valueBytes = new byte[8 *1024];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+ .addColumn(famName, null, valueBytes)
+ );
+ }
+ } finally {
+ ReplicationEndpointForTest.resume();
+ }
+
+ // Wait for replication to complete.
+ Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+ }
+ });
+
+ assertEquals("We sent an incorrect number of batches", NUM_ROWS,
+ ReplicationEndpointForTest.getBatchCount());
+ assertEquals("We did not replicate enough rows", NUM_ROWS,
+ utility2.countRows(htable2));
+ } finally {
+ admin.removePeer("testReplicatorBatching");
+ }
+ }
+
+ @Test
+ public void testReplicatorWithErrors() throws Exception {
+ // Clear the tables
+ truncateTable(utility1, tableName);
+ truncateTable(utility2, tableName);
+
+ // Replace the peer set up for us by the base class with a wrapper for this test
+ admin.addPeer("testReplicatorWithErrors",
+ new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+ .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
+ null);
+
+ FailureInjectingReplicationEndpointForTest.setBatchCount(0);
+ FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
+ try {
+ FailureInjectingReplicationEndpointForTest.pause();
+ try {
+ // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+ // have to be replicated separately.
+ final byte[] valueBytes = new byte[8 *1024];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+ .addColumn(famName, null, valueBytes)
+ );
+ }
+ } finally {
+ FailureInjectingReplicationEndpointForTest.resume();
+ }
+
+ // Wait for replication to complete.
+ // We can expect 10 batches
+ Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+ }
+ });
+
+ assertEquals("We did not replicate enough rows", NUM_ROWS,
+ utility2.countRows(htable2));
+ } finally {
+ admin.removePeer("testReplicatorWithErrors");
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
+ HBaseAdmin admin = util.getHBaseAdmin();
+ admin.disableTable(tableName);
+ admin.truncateTable(tablename, false);
+ }
+
+ public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
+
+ private static int batchCount;
+ private static int entriesCount;
+ private static final Object latch = new Object();
+ private static AtomicBoolean useLatch = new AtomicBoolean(false);
+
+ public static void resume() {
+ useLatch.set(false);
+ synchronized (latch) {
+ latch.notifyAll();
+ }
+ }
+
+ public static void pause() {
+ useLatch.set(true);
+ }
+
+ public static void await() throws InterruptedException {
+ if (useLatch.get()) {
+ LOG.info("Waiting on latch");
+ latch.wait();
+ LOG.info("Waited on latch, now proceeding");
+ }
+ }
+
+ public static int getBatchCount() {
+ return batchCount;
+ }
+
+ public static void setBatchCount(int i) {
+ batchCount = i;
+ }
+
+ public static int getEntriesCount() {
+ return entriesCount;
+ }
+
+ public static void setEntriesCount(int i) {
+ entriesCount = i;
+ }
+
+ public class ReplicatorForTest extends Replicator {
+
+ public ReplicatorForTest(List<Entry> entries, int ordinal) {
+ super(entries, ordinal);
+ }
+
+ @Override
+ protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
+ String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+ throws IOException {
+ try {
+ long size = 0;
+ for (Entry e: entries) {
+ size += e.getKey().estimatedSerializedSizeOf();
+ size += e.getEdit().estimatedSerializedSizeOf();
+ }
+ LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
+ entries.size() + " entries with total size " + size + " bytes to " +
+ replicationClusterId);
+ super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
+ hfileArchiveDir);
+ entriesCount += entries.size();
+ batchCount++;
+ LOG.info("Completed replicating batch " + System.identityHashCode(entries));
+ } catch (IOException e) {
+ LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ try {
+ await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted waiting for latch", e);
+ }
+ return super.replicate(replicateContext);
+ }
+
+ @Override
+ protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+ return new ReplicatorForTest(entries, ordinal);
+ }
+ }
+
+ public static class FailureInjectingReplicationEndpointForTest
+ extends ReplicationEndpointForTest {
+
+ static class FailureInjectingBlockingInterface implements BlockingInterface {
+
+ private final BlockingInterface delegate;
+ private volatile boolean failNext;
+
+ public FailureInjectingBlockingInterface(BlockingInterface delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public GetRegionInfoResponse getRegionInfo(RpcController controller,
+ GetRegionInfoRequest request) throws ServiceException {
+ return delegate.getRegionInfo(controller, request);
+ }
+
+ @Override
+ public GetStoreFileResponse getStoreFile(RpcController controller,
+ GetStoreFileRequest request) throws ServiceException {
+ return delegate.getStoreFile(controller, request);
+ }
+
+ @Override
+ public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+ GetOnlineRegionRequest request) throws ServiceException {
+ return delegate.getOnlineRegion(controller, request);
+ }
+
+ @Override
+ public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+ throws ServiceException {
+ return delegate.openRegion(controller, request);
+ }
+
+ @Override
+ public WarmupRegionResponse warmupRegion(RpcController controller,
+ WarmupRegionRequest request) throws ServiceException {
+ return delegate.warmupRegion(controller, request);
+ }
+
+ @Override
+ public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+ throws ServiceException {
+ return delegate.closeRegion(controller, request);
+ }
+
+ @Override
+ public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+ throws ServiceException {
+ return delegate.flushRegion(controller, request);
+ }
+
+ @Override
+ public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request)
+ throws ServiceException {
+ return delegate.splitRegion(controller, request);
+ }
+
+ @Override
+ public CompactRegionResponse compactRegion(RpcController controller,
+ CompactRegionRequest request) throws ServiceException {
+ return delegate.compactRegion(controller, request);
+ }
+
+ @Override
+ public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ if (!failNext) {
+ failNext = true;
+ return delegate.replicateWALEntry(controller, request);
+ } else {
+ failNext = false;
+ throw new ServiceException("Injected failure");
+ }
+ }
+
+ @Override
+ public ReplicateWALEntryResponse replay(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ return delegate.replay(controller, request);
+ }
+
+ @Override
+ public RollWALWriterResponse rollWALWriter(RpcController controller,
+ RollWALWriterRequest request) throws ServiceException {
+ return delegate.rollWALWriter(controller, request);
+ }
+
+ @Override
+ public GetServerInfoResponse getServerInfo(RpcController controller,
+ GetServerInfoRequest request) throws ServiceException {
+ return delegate.getServerInfo(controller, request);
+ }
+
+ @Override
+ public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
+ throws ServiceException {
+ return delegate.stopServer(controller, request);
+ }
+
+ @Override
+ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+ UpdateFavoredNodesRequest request) throws ServiceException {
+ return delegate.updateFavoredNodes(controller, request);
+ }
+
+ @Override
+ public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+ UpdateConfigurationRequest request) throws ServiceException {
+ return delegate.updateConfiguration(controller, request);
+ }
+
+ @Override
+ public CloseRegionForSplitOrMergeResponse closeRegionForSplitOrMerge(RpcController controller,
+ CloseRegionForSplitOrMergeRequest request) throws ServiceException {
+ return delegate.closeRegionForSplitOrMerge(controller, request);
+ }
+
+ @Override
+ public GetRegionLoadResponse getRegionLoad(RpcController controller,
+ GetRegionLoadRequest request) throws ServiceException {
+ return delegate.getRegionLoad(controller, request);
+ }
+
+ @Override
+ public ClearCompactionQueuesResponse clearCompactionQueues(RpcController controller,
+ ClearCompactionQueuesRequest request) throws ServiceException {
+ return delegate.clearCompactionQueues(controller, request);
+ }
+
+ @Override
+ public GetSpaceQuotaSnapshotsResponse getSpaceQuotaSnapshots(RpcController controller,
+ GetSpaceQuotaSnapshotsRequest request) throws ServiceException {
+ return delegate.getSpaceQuotaSnapshots(controller, request);
+ }
+ }
+
+ public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
+
+ public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
+ super(entries, ordinal);
+ }
+
+ @Override
+ protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
+ String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+ throws IOException {
+ super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ }
+ }
+
+ @Override
+ protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+ return new FailureInjectingReplicatorForTest(entries, ordinal);
+ }
+ }
+
+}
[2/2] hbase git commit: HBASE-18027
HBaseInterClusterReplicationEndpoint should respect RPC limits when batching
edits
Posted by ap...@apache.org.
HBASE-18027 HBaseInterClusterReplicationEndpoint should respect RPC limits when batching edits
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/140c559a
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/140c559a
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/140c559a
Branch: refs/heads/branch-1
Commit: 140c559a3a2528bc1485852bb9b5251901d58798
Parents: 1a37f3b
Author: Andrew Purtell <ap...@apache.org>
Authored: Tue May 30 14:25:36 2017 -0700
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue May 30 14:25:36 2017 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/ipc/RpcServer.java | 4 +-
.../hadoop/hbase/regionserver/wal/WALEdit.java | 8 +
.../HBaseInterClusterReplicationEndpoint.java | 130 ++++--
.../org/apache/hadoop/hbase/wal/WALKey.java | 27 +-
.../regionserver/TestReplicator.java | 408 +++++++++++++++++++
5 files changed, 531 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
index 232b0e8..5617acb 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcServer.java
@@ -258,7 +258,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
protected HBaseRPCErrorHandler errorHandler = null;
- static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
+ public static final String MAX_REQUEST_SIZE = "hbase.ipc.max.request.size";
private static final RequestTooBigException REQUEST_TOO_BIG_EXCEPTION =
new RequestTooBigException();
@@ -274,7 +274,7 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
private static final int DEFAULT_MIN_CLIENT_REQUEST_TIMEOUT = 20;
/** Default value for above params */
- private static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
+ public static final int DEFAULT_MAX_REQUEST_SIZE = DEFAULT_MAX_CALLQUEUE_SIZE / 4; // 256M
private static final int DEFAULT_WARN_RESPONSE_TIME = 10000; // milliseconds
private static final int DEFAULT_WARN_RESPONSE_SIZE = 100 * 1024 * 1024;
http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
index 4fd1c41..832c3fc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALEdit.java
@@ -292,6 +292,14 @@ public class WALEdit implements Writable, HeapSize {
return ret;
}
+ public long estimatedSerializedSizeOf() {
+ long ret = 0;
+ for (Cell cell: cells) {
+ ret += CellUtil.estimatedSerializedSizeOf(cell);
+ }
+ return ret;
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 0acb33a..85bd11a 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -23,7 +23,9 @@ import java.net.ConnectException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
@@ -45,6 +47,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.RpcServer;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
@@ -54,7 +57,6 @@ import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.ipc.RemoteException;
-import javax.security.sasl.SaslException;
import com.google.common.annotations.VisibleForTesting;
@@ -86,6 +88,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
private int socketTimeoutMultiplier;
// Amount of time for shutdown to wait for all tasks to complete
private long maxTerminationWait;
+ // Size limit for replication RPCs, in bytes
+ private int replicationRpcLimit;
//Metrics for this source
private MetricsSource metrics;
// Handles connecting to peer region servers
@@ -130,6 +134,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
new LinkedBlockingQueue<Runnable>());
this.exec.allowCoreThreadTimeOut(true);
this.abortable = ctx.getAbortable();
+ // Set the size limit for replication RPCs to 95% of the max request size.
+ // We could do with less slop if we have an accurate estimate of encoded size. Being
+ // conservative for now.
+ this.replicationRpcLimit = (int)(0.95 * (double)conf.getLong(RpcServer.MAX_REQUEST_SIZE,
+ RpcServer.DEFAULT_MAX_REQUEST_SIZE));
this.replicationBulkLoadDataEnabled =
conf.getBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY,
@@ -185,16 +194,46 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return sleepMultiplier < maxRetriesMultiplier;
}
+ private List<List<Entry>> createBatches(final List<Entry> entries) {
+ int numSinks = Math.max(replicationSinkMgr.getNumSinks(), 1);
+ int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+ // Maintains the current batch for a given partition index
+ Map<Integer, List<Entry>> entryMap = new HashMap<>(n);
+ List<List<Entry>> entryLists = new ArrayList<>();
+ int[] sizes = new int[n];
+
+ for (int i = 0; i < n; i++) {
+ entryMap.put(i, new ArrayList<Entry>(entries.size()/n+1));
+ }
+
+ for (Entry e: entries) {
+ int index = Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n);
+ int entrySize = (int)e.getKey().estimatedSerializedSizeOf() +
+ (int)e.getEdit().estimatedSerializedSizeOf();
+ // If this batch is oversized, add it to final list and initialize a new empty batch
+ if (sizes[index] > 0 /* must include at least one entry */ &&
+ sizes[index] + entrySize > replicationRpcLimit) {
+ entryLists.add(entryMap.get(index));
+ entryMap.put(index, new ArrayList<Entry>());
+ sizes[index] = 0;
+ }
+ entryMap.get(index).add(e);
+ sizes[index] += entrySize;
+ }
+
+ entryLists.addAll(entryMap.values());
+ return entryLists;
+ }
+
/**
* Do the shipping logic
*/
@Override
public boolean replicate(ReplicateContext replicateContext) {
CompletionService<Integer> pool = new ExecutorCompletionService<Integer>(this.exec);
- List<Entry> entries = replicateContext.getEntries();
+ List<List<Entry>> batches;
String walGroupId = replicateContext.getWalGroupId();
int sleepMultiplier = 1;
- int numReplicated = 0;
if (!peersSelected && this.isRunning()) {
connectToPeers();
@@ -208,22 +247,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
return false;
}
- // minimum of: configured threads, number of 100-waledit batches,
- // and number of current sinks
- int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1), numSinks);
+ batches = createBatches(replicateContext.getEntries());
- List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
- if (n == 1) {
- entryLists.add(entries);
- } else {
- for (int i=0; i<n; i++) {
- entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
- }
- // now group by region
- for (Entry e : entries) {
- entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
- }
- }
while (this.isRunning() && !exec.isShutdown()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
@@ -232,35 +257,35 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
continue;
}
try {
- if (LOG.isTraceEnabled()) {
- LOG.trace("Replicating " + entries.size() +
- " entries of total size " + replicateContext.getSize());
- }
-
int futures = 0;
- for (int i=0; i<entryLists.size(); i++) {
- if (!entryLists.get(i).isEmpty()) {
+ for (int i=0; i<batches.size(); i++) {
+ List<Entry> entries = batches.get(i);
+ if (!entries.isEmpty()) {
if (LOG.isTraceEnabled()) {
- LOG.trace("Submitting " + entryLists.get(i).size() +
+ LOG.trace("Submitting " + entries.size() +
" entries of total size " + replicateContext.getSize());
}
// RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
- pool.submit(createReplicator(entryLists.get(i), i));
+ pool.submit(createReplicator(entries, i));
futures++;
}
}
IOException iox = null;
+ long lastWriteTime = 0;
for (int i=0; i<futures; i++) {
try {
// wait for all futures, remove successful parts
// (only the remaining parts will be retried)
Future<Integer> f = pool.take();
int index = f.get().intValue();
- int batchSize = entryLists.get(index).size();
- entryLists.set(index, Collections.<Entry>emptyList());
- // Now, we have marked the batch as done replicating, record its size
- numReplicated += batchSize;
+ List<Entry> batch = batches.get(index);
+ batches.set(index, Collections.<Entry>emptyList()); // remove successful batch
+ // Find the most recent write time in the batch
+ long writeTime = batch.get(batch.size() - 1).getKey().getWriteTime();
+ if (writeTime > lastWriteTime) {
+ lastWriteTime = writeTime;
+ }
} catch (InterruptedException ie) {
iox = new IOException(ie);
} catch (ExecutionException ee) {
@@ -272,15 +297,10 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// if we had any exceptions, try again
throw iox;
}
- if (numReplicated != entries.size()) {
- // Something went wrong here and we don't know what, let's just fail and retry.
- LOG.warn("The number of edits replicated is different from the number received,"
- + " failing for now.");
- return false;
- }
// update metrics
- this.metrics.setAgeOfLastShippedOp(entries.get(entries.size() - 1).getKey().getWriteTime(),
- walGroupId);
+ if (lastWriteTime > 0) {
+ this.metrics.setAgeOfLastShippedOp(lastWriteTime, walGroupId);
+ }
return true;
} catch (IOException ioe) {
@@ -374,17 +394,42 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.ordinal = ordinal;
}
+ protected void replicateEntries(BlockingInterface rrs, final List<Entry> batch,
+ String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+ throws IOException {
+ if (LOG.isTraceEnabled()) {
+ long size = 0;
+ for (Entry e: entries) {
+ size += e.getKey().estimatedSerializedSizeOf();
+ size += e.getEdit().estimatedSerializedSizeOf();
+ }
+ LOG.trace("Replicating batch " + System.identityHashCode(entries) + " of " +
+ entries.size() + " entries with total size " + size + " bytes to " +
+ replicationClusterId);
+ }
+ try {
+ ReplicationProtbufUtil.replicateWALEntry(rrs, batch.toArray(new Entry[batch.size()]),
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Completed replicating batch " + System.identityHashCode(entries));
+ }
+ } catch (IOException e) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Failed replicating batch " + System.identityHashCode(entries), e);
+ }
+ throw e;
+ }
+ }
+
@Override
public Integer call() throws IOException {
SinkPeer sinkPeer = null;
try {
sinkPeer = replicationSinkMgr.getReplicationSink();
BlockingInterface rrs = sinkPeer.getRegionServer();
- ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
- replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir, hfileArchiveDir);
replicationSinkMgr.reportSinkSuccess(sinkPeer);
return ordinal;
-
} catch (IOException ioe) {
if (sinkPeer != null) {
replicationSinkMgr.reportBadSink(sinkPeer);
@@ -392,6 +437,5 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
throw ioe;
}
}
-
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
index 01420d7..5cc7567 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALKey.java
@@ -614,4 +614,29 @@ public class WALKey implements SequenceId, Comparable<WALKey> {
}
}
-}
\ No newline at end of file
+ public long estimatedSerializedSizeOf() {
+ long size = encodedRegionName != null ? encodedRegionName.length : 0;
+ size += tablename != null ? tablename.toBytes().length : 0;
+ if (clusterIds != null) {
+ size += 16 * clusterIds.size();
+ }
+ if (nonceGroup != HConstants.NO_NONCE) {
+ size += Bytes.SIZEOF_LONG; // nonce group
+ }
+ if (nonce != HConstants.NO_NONCE) {
+ size += Bytes.SIZEOF_LONG; // nonce
+ }
+ if (scopes != null) {
+ for (Map.Entry<byte[], Integer> scope: scopes.entrySet()) {
+ size += scope.getKey().length;
+ size += Bytes.SIZEOF_INT;
+ }
+ }
+ size += Bytes.SIZEOF_LONG; // sequence number
+ size += Bytes.SIZEOF_LONG; // write time
+ if (origLogSeqNum > 0) {
+ size += Bytes.SIZEOF_LONG; // original sequence number
+ }
+ return size;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/140c559a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
new file mode 100644
index 0000000..6d15a1b
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -0,0 +1,408 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.*;
+import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.TestReplicationBase;
+import org.apache.hadoop.hbase.replication.regionserver.HBaseInterClusterReplicationEndpoint;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WAL.Entry;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.protobuf.RpcController;
+import com.google.protobuf.ServiceException;
+
+@Category(MediumTests.class)
+public class TestReplicator extends TestReplicationBase {
+
+ static final Log LOG = LogFactory.getLog(TestReplicator.class);
+ static final int NUM_ROWS = 10;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ // Set RPC size limit to 10kb (will be applied to both source and sink clusters)
+ conf1.setInt(RpcServer.MAX_REQUEST_SIZE, 1024 * 10);
+ TestReplicationBase.setUpBeforeClass();
+ admin.removePeer("2"); // Remove the peer set up for us by base class
+ }
+
+ @Test
+ public void testReplicatorBatching() throws Exception {
+ // Clear the tables
+ truncateTable(utility1, tableName);
+ truncateTable(utility2, tableName);
+
+ // Replace the peer set up for us by the base class with a wrapper for this test
+ admin.addPeer("testReplicatorBatching",
+ new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+ .setReplicationEndpointImpl(ReplicationEndpointForTest.class.getName()), null);
+
+ ReplicationEndpointForTest.setBatchCount(0);
+ ReplicationEndpointForTest.setEntriesCount(0);
+ try {
+ ReplicationEndpointForTest.pause();
+ try {
+ // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+ // have to be replicated separately.
+ final byte[] valueBytes = new byte[8 *1024];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+ .addColumn(famName, null, valueBytes)
+ );
+ }
+ } finally {
+ ReplicationEndpointForTest.resume();
+ }
+
+ // Wait for replication to complete.
+ // We can expect 10 batches, 1 row each
+ Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return ReplicationEndpointForTest.getBatchCount() >= NUM_ROWS;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+ }
+ });
+
+ assertEquals("We sent an incorrect number of batches", NUM_ROWS,
+ ReplicationEndpointForTest.getBatchCount());
+ assertEquals("We did not replicate enough rows", NUM_ROWS,
+ utility2.countRows(htable2));
+ } finally {
+ admin.removePeer("testReplicatorBatching");
+ }
+ }
+
+ @Test
+ public void testReplicatorWithErrors() throws Exception {
+ // Clear the tables
+ truncateTable(utility1, tableName);
+ truncateTable(utility2, tableName);
+
+ // Replace the peer set up for us by the base class with a wrapper for this test
+ admin.addPeer("testReplicatorWithErrors",
+ new ReplicationPeerConfig().setClusterKey(utility2.getClusterKey())
+ .setReplicationEndpointImpl(FailureInjectingReplicationEndpointForTest.class.getName()),
+ null);
+
+ FailureInjectingReplicationEndpointForTest.setBatchCount(0);
+ FailureInjectingReplicationEndpointForTest.setEntriesCount(0);
+ try {
+ FailureInjectingReplicationEndpointForTest.pause();
+ try {
+ // Queue up a bunch of cells of size 8K. Because of RPC size limits, they will all
+ // have to be replicated separately.
+ final byte[] valueBytes = new byte[8 *1024];
+ for (int i = 0; i < NUM_ROWS; i++) {
+ htable1.put(new Put(("row"+Integer.toString(i)).getBytes())
+ .addColumn(famName, null, valueBytes)
+ );
+ }
+ } finally {
+ FailureInjectingReplicationEndpointForTest.resume();
+ }
+
+ // Wait for replication to complete.
+ // We can expect 10 batches
+ Waiter.waitFor(conf1, 60000, new Waiter.ExplainingPredicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ return FailureInjectingReplicationEndpointForTest.getEntriesCount() >= NUM_ROWS;
+ }
+
+ @Override
+ public String explainFailure() throws Exception {
+ return "We waited too long for expected replication of " + NUM_ROWS + " entries";
+ }
+ });
+
+ assertEquals("We did not replicate enough rows", NUM_ROWS,
+ utility2.countRows(htable2));
+ } finally {
+ admin.removePeer("testReplicatorWithErrors");
+ }
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TestReplicationBase.tearDownAfterClass();
+ }
+
+ private void truncateTable(HBaseTestingUtility util, TableName tablename) throws IOException {
+ HBaseAdmin admin = util.getHBaseAdmin();
+ admin.disableTable(tableName);
+ admin.truncateTable(tablename, false);
+ }
+
+ public static class ReplicationEndpointForTest extends HBaseInterClusterReplicationEndpoint {
+
+ private static int batchCount;
+ private static int entriesCount;
+ private static final Object latch = new Object();
+ private static AtomicBoolean useLatch = new AtomicBoolean(false);
+
+ public static void resume() {
+ useLatch.set(false);
+ synchronized (latch) {
+ latch.notifyAll();
+ }
+ }
+
+ public static void pause() {
+ useLatch.set(true);
+ }
+
+ public static void await() throws InterruptedException {
+ if (useLatch.get()) {
+ LOG.info("Waiting on latch");
+ latch.wait();
+ LOG.info("Waited on latch, now proceeding");
+ }
+ }
+
+ public static int getBatchCount() {
+ return batchCount;
+ }
+
+ public static void setBatchCount(int i) {
+ batchCount = i;
+ }
+
+ public static int getEntriesCount() {
+ return entriesCount;
+ }
+
+ public static void setEntriesCount(int i) {
+ entriesCount = i;
+ }
+
+ public class ReplicatorForTest extends Replicator {
+
+ public ReplicatorForTest(List<Entry> entries, int ordinal) {
+ super(entries, ordinal);
+ }
+
+ @Override
+ protected void replicateEntries(BlockingInterface rrs, final List<Entry> entries,
+ String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+ throws IOException {
+ try {
+ long size = 0;
+ for (Entry e: entries) {
+ size += e.getKey().estimatedSerializedSizeOf();
+ size += e.getEdit().estimatedSerializedSizeOf();
+ }
+ LOG.info("Replicating batch " + System.identityHashCode(entries) + " of " +
+ entries.size() + " entries with total size " + size + " bytes to " +
+ replicationClusterId);
+ super.replicateEntries(rrs, entries, replicationClusterId, baseNamespaceDir,
+ hfileArchiveDir);
+ entriesCount += entries.size();
+ batchCount++;
+ LOG.info("Completed replicating batch " + System.identityHashCode(entries));
+ } catch (IOException e) {
+ LOG.info("Failed to replicate batch " + System.identityHashCode(entries), e);
+ throw e;
+ }
+ }
+ }
+
+ @Override
+ public boolean replicate(ReplicateContext replicateContext) {
+ try {
+ await();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted waiting for latch", e);
+ }
+ return super.replicate(replicateContext);
+ }
+
+ @Override
+ protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+ return new ReplicatorForTest(entries, ordinal);
+ }
+ }
+
+ public static class FailureInjectingReplicationEndpointForTest
+ extends ReplicationEndpointForTest {
+
+ static class FailureInjectingBlockingInterface implements BlockingInterface {
+
+ private final BlockingInterface delegate;
+ private volatile boolean failNext;
+
+ public FailureInjectingBlockingInterface(BlockingInterface delegate) {
+ this.delegate = delegate;
+ }
+
+ @Override
+ public ReplicateWALEntryResponse replicateWALEntry(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ if (!failNext) {
+ failNext = true;
+ return delegate.replicateWALEntry(controller, request);
+ } else {
+ failNext = false;
+ throw new ServiceException("Injected failure");
+ }
+ }
+
+ @Override
+ public GetRegionInfoResponse getRegionInfo(RpcController controller,
+ GetRegionInfoRequest request) throws ServiceException {
+ return delegate.getRegionInfo(controller, request);
+ }
+
+ @Override
+ public GetStoreFileResponse getStoreFile(RpcController controller,
+ GetStoreFileRequest request) throws ServiceException {
+ return delegate.getStoreFile(controller, request);
+ }
+
+ @Override
+ public GetOnlineRegionResponse getOnlineRegion(RpcController controller,
+ GetOnlineRegionRequest request) throws ServiceException {
+ return delegate.getOnlineRegion(controller, request);
+ }
+
+ @Override
+ public OpenRegionResponse openRegion(RpcController controller, OpenRegionRequest request)
+ throws ServiceException {
+ return delegate.openRegion(controller, request);
+ }
+
+ @Override
+ public WarmupRegionResponse warmupRegion(RpcController controller,
+ WarmupRegionRequest request) throws ServiceException {
+ return delegate.warmupRegion(controller, request);
+ }
+
+ @Override
+ public CloseRegionResponse closeRegion(RpcController controller, CloseRegionRequest request)
+ throws ServiceException {
+ return delegate.closeRegion(controller, request);
+ }
+
+ @Override
+ public FlushRegionResponse flushRegion(RpcController controller, FlushRegionRequest request)
+ throws ServiceException {
+ return delegate.flushRegion(controller, request);
+ }
+
+ @Override
+ public SplitRegionResponse splitRegion(RpcController controller, SplitRegionRequest request)
+ throws ServiceException {
+ return delegate.splitRegion(controller, request);
+ }
+
+ @Override
+ public CompactRegionResponse compactRegion(RpcController controller,
+ CompactRegionRequest request) throws ServiceException {
+ return delegate.compactRegion(controller, request);
+ }
+
+ @Override
+ public MergeRegionsResponse mergeRegions(RpcController controller,
+ MergeRegionsRequest request) throws ServiceException {
+ return delegate.mergeRegions(controller, request);
+ }
+
+ @Override
+ public ReplicateWALEntryResponse replay(RpcController controller,
+ ReplicateWALEntryRequest request) throws ServiceException {
+ return delegate.replay(controller, request);
+ }
+
+ @Override
+ public RollWALWriterResponse rollWALWriter(RpcController controller,
+ RollWALWriterRequest request) throws ServiceException {
+ return delegate.rollWALWriter(controller, request);
+ }
+
+ @Override
+ public GetServerInfoResponse getServerInfo(RpcController controller,
+ GetServerInfoRequest request) throws ServiceException {
+ return delegate.getServerInfo(controller, request);
+ }
+
+ @Override
+ public StopServerResponse stopServer(RpcController controller, StopServerRequest request)
+ throws ServiceException {
+ return delegate.stopServer(controller, request);
+ }
+
+ @Override
+ public UpdateFavoredNodesResponse updateFavoredNodes(RpcController controller,
+ UpdateFavoredNodesRequest request) throws ServiceException {
+ return delegate.updateFavoredNodes(controller, request);
+ }
+
+ @Override
+ public UpdateConfigurationResponse updateConfiguration(RpcController controller,
+ UpdateConfigurationRequest request) throws ServiceException {
+ return delegate.updateConfiguration(controller, request);
+ }
+ }
+
+ public class FailureInjectingReplicatorForTest extends ReplicatorForTest {
+
+ public FailureInjectingReplicatorForTest(List<Entry> entries, int ordinal) {
+ super(entries, ordinal);
+ }
+
+ @Override
+ protected void replicateEntries(BlockingInterface rrs, List<Entry> entries,
+ String replicationClusterId, Path baseNamespaceDir, Path hfileArchiveDir)
+ throws IOException {
+ super.replicateEntries(new FailureInjectingBlockingInterface(rrs), entries,
+ replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+ }
+ }
+
+ @Override
+ protected Replicator createReplicator(List<Entry> entries, int ordinal) {
+ return new FailureInjectingReplicatorForTest(entries, ordinal);
+ }
+ }
+
+}