You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by gx...@apache.org on 2019/11/28 03:01:28 UTC

[hbase] branch branch-2 updated: HBASE-23293 [REPLICATION] make ship edits timeout configurable (#882)

This is an automated email from the ASF dual-hosted git repository.

gxcheng pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2 by this push:
     new 46c090c  HBASE-23293 [REPLICATION] make ship edits timeout configurable (#882)
46c090c is described below

commit 46c090ce453dd8e05d04619e6b8a4a80c9ffa887
Author: chenxu14 <47...@users.noreply.github.com>
AuthorDate: Thu Nov 28 11:01:18 2019 +0800

    HBASE-23293 [REPLICATION] make ship edits timeout configurable (#882)
    
    Signed-off-by: Guangxu Cheng <gx...@apache.org>
---
 .../main/java/org/apache/hadoop/hbase/HConstants.java |  4 ++++
 .../hadoop/hbase/replication/ReplicationUtils.java    | 14 ++++++++++++++
 .../hadoop/hbase/protobuf/ReplicationProtbufUtil.java |  3 ++-
 .../hadoop/hbase/replication/ReplicationEndpoint.java |  7 +++++++
 .../HBaseInterClusterReplicationEndpoint.java         | 19 ++++++++++---------
 .../regionserver/ReplicationSourceShipper.java        |  6 ++++++
 .../hbase/replication/TestReplicationEndpoint.java    |  2 +-
 .../replication/regionserver/TestReplicator.java      |  8 ++++----
 .../regionserver/TestSerialReplicationEndpoint.java   |  2 +-
 9 files changed, 49 insertions(+), 16 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index a1cf8cc..6469227 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -977,6 +977,10 @@ public final class HConstants {
 
   public static final int REPLICATION_SOURCE_TOTAL_BUFFER_DFAULT = 256 * 1024 * 1024;
 
+  /** Configuration key for ReplicationSource shipeEdits timeout */
+  public static final String REPLICATION_SOURCE_SHIPEDITS_TIMEOUT =
+      "replication.source.shipedits.timeout";
+  public static final int REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT = 60000;
 
   /**
    * Directory where the source cluster file system client configuration are placed which is used by
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index c7568bb..b2b87a4 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -171,4 +171,18 @@ public final class ReplicationUtils {
       return tableCFs != null && tableCFs.containsKey(tableName);
     }
   }
+
+  /**
+   * Get the adaptive timeout value when performing a retry
+   */
+  public static int getAdaptiveTimeout(final int initialValue, final int retries) {
+    int ntries = retries;
+    if (ntries >= HConstants.RETRY_BACKOFF.length) {
+      ntries = HConstants.RETRY_BACKOFF.length - 1;
+    }
+    if (ntries < 0) {
+      ntries = 0;
+    }
+    return initialValue * HConstants.RETRY_BACKOFF[ntries];
+  }
 }
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
index 157ad1b..6269393 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/protobuf/ReplicationProtbufUtil.java
@@ -57,11 +57,12 @@ public class ReplicationProtbufUtil {
    */
   public static void replicateWALEntry(final AdminService.BlockingInterface admin,
       final Entry[] entries, String replicationClusterId, Path sourceBaseNamespaceDir,
-      Path sourceHFileArchiveDir) throws IOException {
+      Path sourceHFileArchiveDir, int timeout) throws IOException {
     Pair<AdminProtos.ReplicateWALEntryRequest, CellScanner> p =
         buildReplicateWALEntryRequest(entries, null, replicationClusterId, sourceBaseNamespaceDir,
           sourceHFileArchiveDir);
     HBaseRpcController controller = new HBaseRpcControllerImpl(p.getSecond());
+    controller.setCallTimeout(timeout);
     try {
       admin.replicateWALEntry(controller, p.getFirst());
     } catch (org.apache.hbase.thirdparty.com.google.protobuf.ServiceException e) {
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
index f4c37b1..b1d1279 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationEndpoint.java
@@ -148,6 +148,7 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
     List<Entry> entries;
     int size;
     String walGroupId;
+    int timeout;
     @InterfaceAudience.Private
     public ReplicateContext() {
     }
@@ -173,6 +174,12 @@ public interface ReplicationEndpoint extends ReplicationPeerConfigListener {
     public String getWalGroupId(){
       return walGroupId;
     }
+    public void setTimeout(int timeout) {
+      this.timeout = timeout;
+    }
+    public int getTimeout() {
+      return this.timeout;
+    }
   }
 
   /**
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index e0bb052..db54338 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -309,7 +309,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
             replicateContext.getSize());
         }
         // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
-        pool.submit(createReplicator(entries, i));
+        pool.submit(createReplicator(entries, i, replicateContext.getTimeout()));
         futures++;
       }
     }
@@ -467,7 +467,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   }
 
   @VisibleForTesting
-  protected int replicateEntries(List<Entry> entries, int batchIndex) throws IOException {
+  protected int replicateEntries(List<Entry> entries, int batchIndex, int timeout)
+      throws IOException {
     SinkPeer sinkPeer = null;
     try {
       int entriesHashCode = System.identityHashCode(entries);
@@ -480,7 +481,7 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       BlockingInterface rrs = sinkPeer.getRegionServer();
       try {
         ReplicationProtbufUtil.replicateWALEntry(rrs, entries.toArray(new Entry[entries.size()]),
-          replicationClusterId, baseNamespaceDir, hfileArchiveDir);
+          replicationClusterId, baseNamespaceDir, hfileArchiveDir, timeout);
         if (LOG.isTraceEnabled()) {
           LOG.trace("{} Completed replicating batch {}", logPeerId(), entriesHashCode);
         }
@@ -500,14 +501,14 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     return batchIndex;
   }
 
-  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex)
+  private int serialReplicateRegionEntries(List<Entry> entries, int batchIndex, int timeout)
       throws IOException {
     int batchSize = 0, index = 0;
     List<Entry> batch = new ArrayList<>();
     for (Entry entry : entries) {
       int entrySize = getEstimatedEntrySize(entry);
       if (batchSize > 0 && batchSize + entrySize > replicationRpcLimit) {
-        replicateEntries(batch, index++);
+        replicateEntries(batch, index++, timeout);
         batch.clear();
         batchSize = 0;
       }
@@ -515,15 +516,15 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
       batchSize += entrySize;
     }
     if (batchSize > 0) {
-      replicateEntries(batch, index);
+      replicateEntries(batch, index, timeout);
     }
     return batchIndex;
   }
 
   @VisibleForTesting
-  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex) {
-    return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex)
-        : () -> replicateEntries(entries, batchIndex);
+  protected Callable<Integer> createReplicator(List<Entry> entries, int batchIndex, int timeout) {
+    return isSerial ? () -> serialReplicateRegionEntries(entries, batchIndex, timeout)
+        : () -> replicateEntries(entries, batchIndex, timeout);
   }
 
   private String logPeerId(){
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
index 5fee659..1e68647 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceShipper.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hbase.replication.regionserver;
 
+import static org.apache.hadoop.hbase.replication.ReplicationUtils.getAdaptiveTimeout;
 import java.io.IOException;
 import java.util.List;
 import java.util.concurrent.PriorityBlockingQueue;
@@ -24,6 +25,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellUtil;
+import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.TableName;
 import org.apache.hadoop.hbase.replication.ReplicationEndpoint;
 import org.apache.hadoop.hbase.util.Threads;
@@ -71,6 +73,7 @@ public class ReplicationSourceShipper extends Thread {
   protected final int maxRetriesMultiplier;
   private final int DEFAULT_TIMEOUT = 20000;
   private final int getEntriesTimeout;
+  private final int shipEditsTimeout;
 
   public ReplicationSourceShipper(Configuration conf, String walGroupId,
       PriorityBlockingQueue<Path> queue, ReplicationSourceInterface source) {
@@ -84,6 +87,8 @@ public class ReplicationSourceShipper extends Thread {
         this.conf.getInt("replication.source.maxretriesmultiplier", 300); // 5 minutes @ 1 sec per
     this.getEntriesTimeout =
         this.conf.getInt("replication.source.getEntries.timeout", DEFAULT_TIMEOUT); // 20 seconds
+    this.shipEditsTimeout = this.conf.getInt(HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT,
+        HConstants.REPLICATION_SOURCE_SHIPEDITS_TIMEOUT_DFAULT);
   }
 
   @Override
@@ -176,6 +181,7 @@ public class ReplicationSourceShipper extends Thread {
             new ReplicationEndpoint.ReplicateContext();
         replicateContext.setEntries(entries).setSize(currentSize);
         replicateContext.setWalGroupId(walGroupId);
+        replicateContext.setTimeout(getAdaptiveTimeout(this.shipEditsTimeout, sleepMultiplier));
 
         long startTimeNs = System.nanoTime();
         // send the edits to the endpoint. Will block until the edits are shipped and acknowledged
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
index b909d8f..2a32e3d 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationEndpoint.java
@@ -483,7 +483,7 @@ public class TestReplicationEndpoint extends TestReplicationBase {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
       // Fail only once, we don't want to slow down the test.
       if (failedOnce) {
         return () -> ordinal;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
index bff363f..fad8da9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicator.java
@@ -229,9 +229,9 @@ public class TestReplicator extends TestReplicationBase {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
       return () -> {
-        int batchIndex = replicateEntries(entries, ordinal);
+        int batchIndex = replicateEntries(entries, ordinal, timeout);
         entriesCount += entries.size();
         int count = batchCount.incrementAndGet();
         LOG.info(
@@ -246,10 +246,10 @@ public class TestReplicator extends TestReplicationBase {
     private final AtomicBoolean failNext = new AtomicBoolean(false);
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
       return () -> {
         if (failNext.compareAndSet(false, true)) {
-          int batchIndex = replicateEntries(entries, ordinal);
+          int batchIndex = replicateEntries(entries, ordinal, timeout);
           entriesCount += entries.size();
           int count = batchCount.incrementAndGet();
           LOG.info(
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
index 7d59d38..3c88ab3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestSerialReplicationEndpoint.java
@@ -167,7 +167,7 @@ public class TestSerialReplicationEndpoint {
     }
 
     @Override
-    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal) {
+    protected Callable<Integer> createReplicator(List<Entry> entries, int ordinal, int timeout) {
       return () -> {
         entryQueue.addAll(entries);
         return ordinal;