You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2015/09/04 22:29:25 UTC

[2/3] hbase git commit: HBASE-12988 [Replication]Parallel apply edits across regions.

HBASE-12988 [Replication]Parallel apply edits across regions.


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16d4ed63
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16d4ed63
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16d4ed63

Branch: refs/heads/branch-1
Commit: 16d4ed63371e5c519314e94e055f9e6e13dc1e81
Parents: 1ac42c9
Author: Lars Hofhansl <la...@apache.org>
Authored: Fri Sep 4 13:22:14 2015 -0700
Committer: Lars Hofhansl <la...@apache.org>
Committed: Fri Sep 4 13:23:36 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hbase/HConstants.java     |   6 +
 .../src/main/resources/hbase-default.xml        |  11 ++
 .../HBaseInterClusterReplicationEndpoint.java   | 118 ++++++++++++++++---
 3 files changed, 120 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/16d4ed63/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index e2eb6e0..6af2faa 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1133,6 +1133,12 @@ public final class HConstants {
   /** Configuration key for setting replication codec class name */
   public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
 
+  /** Maximum number of threads used by the replication source for shipping edits to the sinks */
+  public static final String REPLICATION_SOURCE_MAXTHREADS_KEY =
+      "hbase.replication.source.maxthreads";
+
+  public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
+
   /** Config for pluggable consensus provider */
   public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
     "hbase.coordinated.state.manager.class";

http://git-wip-us.apache.org/repos/asf/hbase/blob/16d4ed63/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index f643cbb..3ea6d60 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1423,6 +1423,17 @@ possible configurations would overwhelm and obscure the important.
   		using KeyValueCodecWithTags for replication when there are no tags causes no harm.
   	</description>
   </property>
+  <property>
+    <name>hbase.replication.source.maxthreads</name>
+    <value>10</value>
+    <description>
+        The maximum number of threads any replication source will use for
+        shipping edits to the sinks in parallel. This also limits the number of
+        chunks each replication batch is broken into.
+        Larger values can improve the replication throughput between the master and
+        slave clusters. The default of 10 will rarely need to be changed.
+    </description>
+  </property>
   <!-- Static Web User Filter properties. -->
   <property>
     <description>

http://git-wip-us.apache.org/repos/asf/hbase/blob/16d4ed63/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 62b7963..eb42e69 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -21,7 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
+import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -34,6 +42,7 @@ import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
 import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.wal.WAL.Entry;
 import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
 import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -71,6 +80,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   // Handles connecting to peer region servers
   private ReplicationSinkManager replicationSinkMgr;
   private boolean peersSelected = false;
+  private ThreadPoolExecutor exec;
+  private int maxThreads;
 
   @Override
   public void init(Context context) throws IOException {
@@ -89,6 +100,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
     this.metrics = context.getMetrics();
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+    // per sink thread pool
+    this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
+      HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
+    this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
+      new SynchronousQueue<Runnable>());
   }
 
   private void decorateConf() {
@@ -139,32 +155,71 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
   public boolean replicate(ReplicateContext replicateContext) {
     List<Entry> entries = replicateContext.getEntries();
     int sleepMultiplier = 1;
-    while (this.isRunning()) {
-      if (!peersSelected) {
-        connectToPeers();
-        peersSelected = true;
-      }
 
+    if (!peersSelected && this.isRunning()) {
+      connectToPeers();
+      peersSelected = true;
+    }
+
+    // minimum of: configured threads, number of 100-waledit batches,
+    //  and number of current sinks
+    int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1),
+      replicationSinkMgr.getSinks().size());
+    List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
+    if (n == 1) {
+      entryLists.add(entries);
+    } else {
+      for (int i=0; i<n; i++) {
+        entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
+      }
+      // now group by region
+      for (Entry e : entries) {
+        entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
+      }
+    }
+    while (this.isRunning()) {
       if (!isPeerEnabled()) {
         if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
           sleepMultiplier++;
         }
         continue;
       }
-      SinkPeer sinkPeer = null;
       try {
-        sinkPeer = replicationSinkMgr.getReplicationSink();
-        BlockingInterface rrs = sinkPeer.getRegionServer();
         if (LOG.isTraceEnabled()) {
           LOG.trace("Replicating " + entries.size() +
               " entries of total size " + replicateContext.getSize());
         }
-        ReplicationProtbufUtil.replicateWALEntry(rrs,
-            entries.toArray(new Entry[entries.size()]));
 
+        List<Future<Integer>> futures = new ArrayList<Future<Integer>>(entryLists.size());
+        for (int i=0; i<entryLists.size(); i++) {
+          if (!entryLists.get(i).isEmpty()) {
+            if (LOG.isTraceEnabled()) {
+              LOG.trace("Submitting " + entryLists.get(i).size() +
+                  " entries of total size " + replicateContext.getSize());
+            }
+            // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+            futures.add(exec.submit(new Replicator(entryLists.get(i), i)));
+          }
+        }
+        IOException iox = null;
+        for (Future<Integer> f : futures) {
+          try {
+            // wait for all futures, remove successful parts
+            // (only the remaining parts will be retried)
+            entryLists.remove(f.get());
+          } catch (InterruptedException ie) {
+            iox =  new IOException(ie);
+          } catch (ExecutionException ee) {
+            // cause must be an IOException
+            iox = (IOException)ee.getCause();
+          }
+        }
+        if (iox != null) {
+          // if we had any exceptions, try again
+          throw iox;
+        }
         // update metrics
         this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
-        replicationSinkMgr.reportSinkSuccess(sinkPeer);
         return true;
 
       } catch (IOException ioe) {
@@ -195,10 +250,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
             LOG.warn("Can't replicate because of a local or network error: ", ioe);
           }
         }
-
-        if (sinkPeer != null) {
-          replicationSinkMgr.reportBadSink(sinkPeer);
-        }
         if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -222,6 +273,43 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
         LOG.warn("Failed to close the connection");
       }
     }
+    exec.shutdownNow();
     notifyStopped();
   }
+
+  // is this needed? Nobody else will call doStop() otherwise
+  @Override
+  public State stopAndWait() {
+    doStop();
+    return super.stopAndWait();
+  }
+
+  private class Replicator implements Callable<Integer> {
+    private List<Entry> entries;
+    private int ordinal;
+    public Replicator(List<Entry> entries, int ordinal) {
+      this.entries = entries;
+      this.ordinal = ordinal;
+    }
+
+    @Override
+    public Integer call() throws IOException {
+      SinkPeer sinkPeer = null;
+      try {
+        sinkPeer = replicationSinkMgr.getReplicationSink();
+        BlockingInterface rrs = sinkPeer.getRegionServer();
+        ReplicationProtbufUtil.replicateWALEntry(rrs,
+            entries.toArray(new Entry[entries.size()]));
+        replicationSinkMgr.reportSinkSuccess(sinkPeer);
+        return ordinal;
+
+      } catch (IOException ioe) {
+        if (sinkPeer != null) {
+          replicationSinkMgr.reportBadSink(sinkPeer);
+        }
+        throw ioe;
+      }
+    }
+
+  }
 }