You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2015/09/04 22:29:25 UTC
[2/3] hbase git commit: HBASE-12988 [Replication]Parallel apply edits
across regions.
HBASE-12988 [Replication]Parallel apply edits across regions.
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/16d4ed63
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/16d4ed63
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/16d4ed63
Branch: refs/heads/branch-1
Commit: 16d4ed63371e5c519314e94e055f9e6e13dc1e81
Parents: 1ac42c9
Author: Lars Hofhansl <la...@apache.org>
Authored: Fri Sep 4 13:22:14 2015 -0700
Committer: Lars Hofhansl <la...@apache.org>
Committed: Fri Sep 4 13:23:36 2015 -0700
----------------------------------------------------------------------
.../org/apache/hadoop/hbase/HConstants.java | 6 +
.../src/main/resources/hbase-default.xml | 11 ++
.../HBaseInterClusterReplicationEndpoint.java | 118 ++++++++++++++++---
3 files changed, 120 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/16d4ed63/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
index e2eb6e0..6af2faa 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/HConstants.java
@@ -1133,6 +1133,12 @@ public final class HConstants {
/** Configuration key for setting replication codec class name */
public static final String REPLICATION_CODEC_CONF_KEY = "hbase.replication.rpc.codec";
+ /** Maximum number of threads used by the replication source for shipping edits to the sinks */
+ public static final String REPLICATION_SOURCE_MAXTHREADS_KEY =
+ "hbase.replication.source.maxthreads";
+
+ public static final int REPLICATION_SOURCE_MAXTHREADS_DEFAULT = 10;
+
/** Config for pluggable consensus provider */
public static final String HBASE_COORDINATED_STATE_MANAGER_CLASS =
"hbase.coordinated.state.manager.class";
http://git-wip-us.apache.org/repos/asf/hbase/blob/16d4ed63/hbase-common/src/main/resources/hbase-default.xml
----------------------------------------------------------------------
diff --git a/hbase-common/src/main/resources/hbase-default.xml b/hbase-common/src/main/resources/hbase-default.xml
index f643cbb..3ea6d60 100644
--- a/hbase-common/src/main/resources/hbase-default.xml
+++ b/hbase-common/src/main/resources/hbase-default.xml
@@ -1423,6 +1423,17 @@ possible configurations would overwhelm and obscure the important.
using KeyValueCodecWithTags for replication when there are no tags causes no harm.
</description>
</property>
+ <property>
+ <name>hbase.replication.source.maxthreads</name>
+ <value>10</value>
+ <description>
+ The maximum number of threads any replication source will use for
+ shipping edits to the sinks in parallel. This also limits the number of
+ chunks each replication batch is broken into.
+ Larger values can improve the replication throughput between the master and
+ slave clusters. The default of 10 will rarely need to be changed.
+ </description>
+ </property>
<!-- Static Web User Filter properties. -->
<property>
<description>
http://git-wip-us.apache.org/repos/asf/hbase/blob/16d4ed63/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
index 62b7963..eb42e69 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/HBaseInterClusterReplicationEndpoint.java
@@ -21,7 +21,15 @@ package org.apache.hadoop.hbase.replication.regionserver;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
+import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.SynchronousQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -34,6 +42,7 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
+import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.wal.WAL.Entry;
import org.apache.hadoop.hbase.replication.HBaseReplicationEndpoint;
import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
@@ -71,6 +80,8 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
// Handles connecting to peer region servers
private ReplicationSinkManager replicationSinkMgr;
private boolean peersSelected = false;
+ private ThreadPoolExecutor exec;
+ private int maxThreads;
@Override
public void init(Context context) throws IOException {
@@ -89,6 +100,11 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
this.metrics = context.getMetrics();
// ReplicationQueueInfo parses the peerId out of the znode for us
this.replicationSinkMgr = new ReplicationSinkManager(conn, ctx.getPeerId(), this, this.conf);
+ // per sink thread pool
+ this.maxThreads = this.conf.getInt(HConstants.REPLICATION_SOURCE_MAXTHREADS_KEY,
+ HConstants.REPLICATION_SOURCE_MAXTHREADS_DEFAULT);
+ this.exec = new ThreadPoolExecutor(1, maxThreads, 60, TimeUnit.SECONDS,
+ new SynchronousQueue<Runnable>());
}
private void decorateConf() {
@@ -139,32 +155,71 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
public boolean replicate(ReplicateContext replicateContext) {
List<Entry> entries = replicateContext.getEntries();
int sleepMultiplier = 1;
- while (this.isRunning()) {
- if (!peersSelected) {
- connectToPeers();
- peersSelected = true;
- }
+ if (!peersSelected && this.isRunning()) {
+ connectToPeers();
+ peersSelected = true;
+ }
+
+ // minimum of: configured threads, number of 100-waledit batches,
+ // and number of current sinks
+ int n = Math.min(Math.min(this.maxThreads, entries.size()/100+1),
+ replicationSinkMgr.getSinks().size());
+ List<List<Entry>> entryLists = new ArrayList<List<Entry>>(n);
+ if (n == 1) {
+ entryLists.add(entries);
+ } else {
+ for (int i=0; i<n; i++) {
+ entryLists.add(new ArrayList<Entry>(entries.size()/n+1));
+ }
+ // now group by region
+ for (Entry e : entries) {
+ entryLists.get(Math.abs(Bytes.hashCode(e.getKey().getEncodedRegionName())%n)).add(e);
+ }
+ }
+ while (this.isRunning()) {
if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
continue;
}
- SinkPeer sinkPeer = null;
try {
- sinkPeer = replicationSinkMgr.getReplicationSink();
- BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + entries.size() +
" entries of total size " + replicateContext.getSize());
}
- ReplicationProtbufUtil.replicateWALEntry(rrs,
- entries.toArray(new Entry[entries.size()]));
+ List<Future<Integer>> futures = new ArrayList<Future<Integer>>(entryLists.size());
+ for (int i=0; i<entryLists.size(); i++) {
+ if (!entryLists.get(i).isEmpty()) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Submitting " + entryLists.get(i).size() +
+ " entries of total size " + replicateContext.getSize());
+ }
+ // RuntimeExceptions encountered here bubble up and are handled in ReplicationSource
+ futures.add(exec.submit(new Replicator(entryLists.get(i), i)));
+ }
+ }
+ IOException iox = null;
+ for (Future<Integer> f : futures) {
+ try {
+ // wait for all futures, remove successful parts
+ // (only the remaining parts will be retried)
+ entryLists.remove(f.get());
+ } catch (InterruptedException ie) {
+ iox = new IOException(ie);
+ } catch (ExecutionException ee) {
+ // cause must be an IOException
+ iox = (IOException)ee.getCause();
+ }
+ }
+ if (iox != null) {
+ // if we had any exceptions, try again
+ throw iox;
+ }
// update metrics
this.metrics.setAgeOfLastShippedOp(entries.get(entries.size()-1).getKey().getWriteTime());
- replicationSinkMgr.reportSinkSuccess(sinkPeer);
return true;
} catch (IOException ioe) {
@@ -195,10 +250,6 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Can't replicate because of a local or network error: ", ioe);
}
}
-
- if (sinkPeer != null) {
- replicationSinkMgr.reportBadSink(sinkPeer);
- }
if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -222,6 +273,43 @@ public class HBaseInterClusterReplicationEndpoint extends HBaseReplicationEndpoi
LOG.warn("Failed to close the connection");
}
}
+ exec.shutdownNow();
notifyStopped();
}
+
+ // is this needed? Nobody else will call doStop() otherwise
+ @Override
+ public State stopAndWait() {
+ doStop();
+ return super.stopAndWait();
+ }
+
+ private class Replicator implements Callable<Integer> {
+ private List<Entry> entries;
+ private int ordinal;
+ public Replicator(List<Entry> entries, int ordinal) {
+ this.entries = entries;
+ this.ordinal = ordinal;
+ }
+
+ @Override
+ public Integer call() throws IOException {
+ SinkPeer sinkPeer = null;
+ try {
+ sinkPeer = replicationSinkMgr.getReplicationSink();
+ BlockingInterface rrs = sinkPeer.getRegionServer();
+ ReplicationProtbufUtil.replicateWALEntry(rrs,
+ entries.toArray(new Entry[entries.size()]));
+ replicationSinkMgr.reportSinkSuccess(sinkPeer);
+ return ordinal;
+
+ } catch (IOException ioe) {
+ if (sinkPeer != null) {
+ replicationSinkMgr.reportBadSink(sinkPeer);
+ }
+ throw ioe;
+ }
+ }
+
+ }
}