You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/08/01 19:03:28 UTC
svn commit: r1509331 - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/replication/
hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/...
Author: jdcryans
Date: Thu Aug 1 17:03:28 2013
New Revision: 1509331
URL: http://svn.apache.org/r1509331
Log:
HBASE-7634 Replication handling of changes to peer clusters is inefficient (Gabriel Reid via JD)
Added:
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
Modified:
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Thu Aug 1 17:03:28 2013
@@ -18,6 +18,13 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -35,12 +42,6 @@ import org.apache.zookeeper.KeeperExcept
import com.google.protobuf.InvalidProtocolBufferException;
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* This class acts as a wrapper for all the objects used to identify and
* communicate with remote peers and is responsible for answering to expired
@@ -57,9 +58,11 @@ public class ReplicationPeer implements
// Cannot be final since a new object needs to be recreated when session fails
private ZooKeeperWatcher zkw;
private final Configuration conf;
+ private long lastRegionserverUpdate;
private PeerStateTracker peerStateTracker;
+
/**
* Constructor that takes all the objects required to communicate with the
* specified peer, except for the region server addresses.
@@ -130,6 +133,7 @@ public class ReplicationPeer implements
*/
public void setRegionServers(List<ServerName> regionServers) {
this.regionServers = regionServers;
+ lastRegionserverUpdate = System.currentTimeMillis();
}
/**
@@ -141,6 +145,15 @@ public class ReplicationPeer implements
}
/**
+ * Get the timestamp at which the last change occurred to the list of region servers to replicate
+ * to.
+ * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+ */
+ public long getLastRegionserverUpdate() {
+ return lastRegionserverUpdate;
+ }
+
+ /**
* Get the identifier of this peer
* @return string representation of the id (short)
*/
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java Thu Aug 1 17:03:28 2013
@@ -123,6 +123,14 @@ public interface ReplicationPeers {
List<ServerName> getRegionServersOfConnectedPeer(String peerId);
/**
+ * Get the timestamp of the last change in composition of a given peer cluster.
+ * @param peerId identifier of the peer cluster for which the timestamp is requested
+ * @return the timestamp (in milliseconds) of the last change to the composition of
+ * the peer cluster
+ */
+ long getTimestampOfLastChangeToPeer(String peerId);
+
+ /**
* Returns the UUID of the provided peer id.
* @param peerId the peer's ID that will be converted into a UUID
* @return a UUID or null if the peer cluster does not exist or is not connected.
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Thu Aug 1 17:03:28 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -266,6 +267,14 @@ public class ReplicationPeersZKImpl exte
return ids;
}
+ @Override
+ public long getTimestampOfLastChangeToPeer(String peerId) {
+ if (!peerClusters.containsKey(peerId)) {
+ throw new IllegalArgumentException("Unknown peer id: " + peerId);
+ }
+ return peerClusters.get(peerId).getLastRegionserverUpdate();
+ }
+
/**
* A private method used during initialization. This method attempts to connect to all registered
* peer clusters. This method does not set a watch on the peer cluster znodes.
@@ -291,6 +300,7 @@ public class ReplicationPeersZKImpl exte
LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
try {
peer.reloadZkWatcher();
+ peer.getZkw().registerListener(new PeerRegionServerListener(peer));
} catch (IOException io) {
LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
}
@@ -304,7 +314,7 @@ public class ReplicationPeersZKImpl exte
*/
private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
throws KeeperException {
- List<String> children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
+ List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
if (children == null) {
return Collections.emptyList();
}
@@ -315,6 +325,7 @@ public class ReplicationPeersZKImpl exte
return addresses;
}
+
private String getPeerStateNode(String id) {
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
}
@@ -366,6 +377,7 @@ public class ReplicationPeersZKImpl exte
ReplicationPeer peer =
new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+ peer.getZkw().registerListener(new PeerRegionServerListener(peer));
return peer;
}
@@ -406,4 +418,37 @@ public class ReplicationPeersZKImpl exte
.toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
+
+ /**
+ * Tracks changes to the list of region servers in a peer's cluster.
+ */
+ public static class PeerRegionServerListener extends ZooKeeperListener {
+
+ private ReplicationPeer peer;
+ private String regionServerListNode;
+
+ public PeerRegionServerListener(ReplicationPeer replicationPeer) {
+ super(replicationPeer.getZkw());
+ this.peer = replicationPeer;
+ this.regionServerListNode = peer.getZkw().rsZNode;
+ }
+
+ public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
+ super(zkw);
+ this.regionServerListNode = regionServerListNode;
+ }
+
+ @Override
+ public synchronized void nodeChildrenChanged(String path) {
+ if (path.equals(regionServerListNode)) {
+ try {
+ LOG.info("Detected change to peer regionservers, fetching updated list");
+ peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
+ } catch (KeeperException e) {
+ LOG.fatal("Error reading slave addresses", e);
+ }
+ }
+ }
+
+ }
}
\ No newline at end of file
Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java?rev=1509331&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java Thu Aug 1 17:03:28 2013
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Maintains a collection of peers to replicate to, and randomly selects a
+ * single peer to replicate to per set of data to replicate. Also handles
+ * keeping track of peer availability.
+ */
+public class ReplicationSinkManager {
+
+ private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class);
+
+ /**
+ * Default maximum number of times a replication sink can be reported as bad before
+ * it will no longer be provided as a sink for replication without the pool of
+ * replication sinks being refreshed.
+ */
+ static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
+
+ /**
+ * Default ratio of the total number of peer cluster region servers to consider
+ * replicating to.
+ */
+ static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
+
+
+ private final HConnection conn;
+
+ private final String peerClusterId;
+
+ private final ReplicationPeers replicationPeers;
+
+ // Count of "bad replication sink" reports per peer sink
+ private final Map<ServerName, Integer> badReportCounts;
+
+ // Ratio of total number of potential peer region servers to be used
+ private final float ratio;
+
+ // Maximum number of times a sink can be reported as bad before the pool of
+ // replication sinks is refreshed
+ private final int badSinkThreshold;
+
+ private final Random random;
+
+ // A timestamp of the last time the list of replication peers changed
+ private long lastUpdateToPeers;
+
+ // The current pool of sinks to which replication can be performed
+ private List<ServerName> sinks = Lists.newArrayList();
+
+ /**
+ * Instantiate for a single replication peer cluster.
+ * @param conn connection to the peer cluster
+ * @param peerClusterId identifier of the peer cluster
+ * @param replicationPeers manages peer clusters being replicated to
+ * @param conf HBase configuration, used for determining replication source ratio and bad peer
+ * threshold
+ */
+ public ReplicationSinkManager(HConnection conn, String peerClusterId,
+ ReplicationPeers replicationPeers, Configuration conf) {
+ this.conn = conn;
+ this.peerClusterId = peerClusterId;
+ this.replicationPeers = replicationPeers;
+ this.badReportCounts = Maps.newHashMap();
+ this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
+ this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
+ DEFAULT_BAD_SINK_THRESHOLD);
+ this.random = new Random();
+ }
+
+ /**
+ * Get a randomly-chosen replication sink to replicate to.
+ *
+ * @return a replication sink to replicate to
+ */
+ public SinkPeer getReplicationSink() throws IOException {
+ if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
+ > this.lastUpdateToPeers) {
+ LOG.info("Current list of sinks is out of date, updating");
+ chooseSinks();
+ }
+
+ if (sinks.isEmpty()) {
+ throw new IOException("No replication sinks are available");
+ }
+ ServerName serverName = sinks.get(random.nextInt(sinks.size()));
+ return new SinkPeer(serverName, conn.getAdmin(serverName));
+ }
+
+ /**
+ * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
+ * failed). If a single SinkPeer is reported as bad more than
+ * replication.bad.sink.threshold times, it will be removed
+ * from the pool of potential replication targets.
+ *
+ * @param sinkPeer
+ * The SinkPeer that had a failed replication attempt on it
+ */
+ public void reportBadSink(SinkPeer sinkPeer) {
+ ServerName serverName = sinkPeer.getServerName();
+ int badReportCount = (badReportCounts.containsKey(serverName)
+ ? badReportCounts.get(serverName) : 0) + 1;
+ badReportCounts.put(serverName, badReportCount);
+ if (badReportCount > badSinkThreshold) {
+ this.sinks.remove(serverName);
+ if (sinks.isEmpty()) {
+ chooseSinks();
+ }
+ }
+ }
+
+ void chooseSinks() {
+ List<ServerName> slaveAddresses =
+ replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
+ Collections.shuffle(slaveAddresses, random);
+ int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
+ sinks = slaveAddresses.subList(0, numSinks);
+ lastUpdateToPeers = System.currentTimeMillis();
+ badReportCounts.clear();
+ }
+
+ List<ServerName> getSinks() {
+ return sinks;
+ }
+
+ /**
+ * Wraps a replication region server sink to provide the ability to identify
+ * it.
+ */
+ public static class SinkPeer {
+ private ServerName serverName;
+ private AdminService.BlockingInterface regionServer;
+
+ public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
+ this.serverName = serverName;
+ this.regionServer = regionServer;
+ }
+
+ ServerName getServerName() {
+ return serverName;
+ }
+
+ public AdminService.BlockingInterface getRegionServer() {
+ return regionServer;
+ }
+
+ }
+
+}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Aug 1 17:03:28 2013
@@ -23,16 +23,11 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.ConnectException;
import java.net.SocketTimeoutException;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.List;
import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -45,20 +40,19 @@ import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.ipc.RemoteException;
@@ -89,9 +83,6 @@ public class ReplicationSource extends T
private ReplicationQueues replicationQueues;
private ReplicationPeers replicationPeers;
private Configuration conf;
- // ratio of region servers to chose from a slave cluster
- private float ratio;
- private Random random;
private ReplicationQueueInfo replicationQueueInfo;
// id of the peer cluster this source replicates to
private String peerId;
@@ -99,8 +90,6 @@ public class ReplicationSource extends T
private ReplicationSourceManager manager;
// Should we stop everything?
private Stoppable stopper;
- // List of chosen sinks (region servers)
- private List<ServerName> currentPeers;
// How long should we sleep for each retry
private long sleepForRetries;
// Max size in bytes of entriesArray
@@ -140,6 +129,8 @@ public class ReplicationSource extends T
private MetricsSource metrics;
// Handle on the log reader helper
private ReplicationHLogReaderManager repLogReader;
+ // Handles connecting to peer region servers
+ private ReplicationSinkManager replicationSinkMgr;
/**
* Instantiation method used by region servers
@@ -178,9 +169,6 @@ public class ReplicationSource extends T
this.conn = HConnectionManager.getConnection(conf);
this.replicationQueues = replicationQueues;
this.replicationPeers = replicationPeers;
- this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
- this.currentPeers = new ArrayList<ServerName>();
- this.random = new Random();
this.manager = manager;
this.sleepForRetries =
this.conf.getLong("replication.source.sleepforretries", 1000);
@@ -193,29 +181,9 @@ public class ReplicationSource extends T
this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
// ReplicationQueueInfo parses the peerId out of the znode for us
this.peerId = this.replicationQueueInfo.getPeerId();
+ this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf);
}
- /**
- * Select a number of peers at random using the ratio. Mininum 1.
- */
- private void chooseSinks() {
- this.currentPeers.clear();
- List<ServerName> addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId);
- Set<ServerName> setOfAddr = new HashSet<ServerName>();
- int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
- LOG.debug("Getting " + nbPeers +
- " rs from peer cluster # " + this.peerId);
- for (int i = 0; i < nbPeers; i++) {
- ServerName sn;
- // Make sure we get one address that we don't already have
- do {
- sn = addresses.get(this.random.nextInt(addresses.size()));
- } while (setOfAddr.contains(sn));
- LOG.info("Choosing peer " + sn);
- setOfAddr.add(sn);
- }
- this.currentPeers.addAll(setOfAddr);
- }
@Override
public void enqueueLog(Path log) {
@@ -457,9 +425,9 @@ public class ReplicationSource extends T
int sleepMultiplier = 1;
// Connect to peer cluster first, unless we have to stop
- while (this.isActive() && this.currentPeers.size() == 0) {
- chooseSinks();
- if (this.isActive() && this.currentPeers.size() == 0) {
+ while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
+ replicationSinkMgr.chooseSinks();
+ if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -583,7 +551,7 @@ public class ReplicationSource extends T
return (this.repLogReader.getPosition() == 0 &&
!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
}
-
+
/**
* Do the sleeping logic
* @param msg Why we sleep
@@ -637,7 +605,7 @@ public class ReplicationSource extends T
/**
* Do the shipping logic
- * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
+ * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
* written to when this method was called
*/
protected void shipEdits(boolean currentWALisBeingWrittenTo) {
@@ -653,8 +621,10 @@ public class ReplicationSource extends T
}
continue;
}
+ SinkPeer sinkPeer = null;
try {
- AdminService.BlockingInterface rrs = getRS();
+ sinkPeer = replicationSinkMgr.getReplicationSink();
+ BlockingInterface rrs = sinkPeer.getRegionServer();
if (LOG.isTraceEnabled()) {
LOG.trace("Replicating " + this.currentNbEntries + " entries");
}
@@ -700,27 +670,17 @@ public class ReplicationSource extends T
this.socketTimeoutMultiplier);
} else if (ioe instanceof ConnectException) {
LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
- chooseSinks();
+ replicationSinkMgr.chooseSinks();
} else {
LOG.warn("Can't replicate because of a local or network error: ", ioe);
}
}
- try {
- boolean down;
- // Spin while the slave is down and we're not asked to shutdown/close
- do {
- down = isSlaveDown();
- if (down) {
- if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
- sleepMultiplier++;
- } else {
- chooseSinks();
- }
- }
- } while (this.isActive() && down );
- } catch (InterruptedException e) {
- LOG.debug("Interrupted while trying to contact the peer cluster");
+ if (sinkPeer != null) {
+ replicationSinkMgr.reportBadSink(sinkPeer);
+ }
+ if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+ sleepMultiplier++;
}
}
}
@@ -797,49 +757,6 @@ public class ReplicationSource extends T
Threads.shutdown(this, this.sleepForRetries);
}
- /**
- * Get a new region server at random from this peer
- * @return
- * @throws IOException
- */
- private AdminService.BlockingInterface getRS() throws IOException {
- if (this.currentPeers.size() == 0) {
- throw new IOException(this.peerClusterZnode + " has 0 region servers");
- }
- ServerName address =
- currentPeers.get(random.nextInt(this.currentPeers.size()));
- return this.conn.getAdmin(address);
- }
-
- /**
- * Check if the slave is down by trying to establish a connection
- * @return true if down, false if up
- * @throws InterruptedException
- */
- public boolean isSlaveDown() throws InterruptedException {
- final CountDownLatch latch = new CountDownLatch(1);
- Thread pingThread = new Thread() {
- public void run() {
- try {
- AdminService.BlockingInterface rrs = getRS();
- // Dummy call which should fail
- ProtobufUtil.getServerInfo(rrs);
- latch.countDown();
- } catch (IOException ex) {
- if (ex instanceof RemoteException) {
- ex = ((RemoteException) ex).unwrapRemoteException();
- }
- LOG.info("Slave cluster looks down: " + ex.getMessage(), ex);
- }
- }
- };
- pingThread.start();
- // awaits returns true if countDown happened
- boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
- pingThread.interrupt();
- return down;
- }
-
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java?rev=1509331&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java Thu Aug 1 17:03:28 2013
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test handling of changes to the number of a peer's regionservers.
+ */
+@Category(LargeTests.class)
+public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
+
+ private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @Before
+ public void setUp() throws Exception {
+ htable1.setAutoFlush(true);
+ // Starting and stopping replication can make us miss new logs,
+ // rolling like this makes sure the most recent one gets added to the queue
+ for (JVMClusterUtil.RegionServerThread r :
+ utility1.getHBaseCluster().getRegionServerThreads()) {
+ r.getRegionServer().getWAL().rollWriter();
+ }
+ utility1.truncateTable(tableName);
+ // truncating the table will send one Delete per row to the slave cluster
+ // in an async fashion, which is why we cannot just call truncateTable on
+ // utility2 since late writes could make it to the slave in some way.
+ // Instead, we truncate the first table and wait for all the Deletes to
+ // make it to the slave.
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+ scanner.close();
+ if (res.length != 0) {
+ if (res.length < lastCount) {
+ i--; // Don't increment timeout if we make progress
+ }
+ lastCount = res.length;
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
+ }
+
+ @Test(timeout = 300000)
+ public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
+
+ LOG.info("testSimplePutDelete");
+ MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
+
+ doPutTest(Bytes.toBytes(1));
+
+ int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
+ peerCluster.stopRegionServer(rsToStop);
+ peerCluster.waitOnRegionServer(rsToStop);
+
+ // Sanity check
+ assertEquals(1, peerCluster.getRegionServerThreads().size());
+
+ doPutTest(Bytes.toBytes(2));
+
+ peerCluster.startRegionServer();
+
+ // Sanity check
+ assertEquals(2, peerCluster.getRegionServerThreads().size());
+
+ doPutTest(Bytes.toBytes(3));
+
+ }
+
+ private void doPutTest(byte[] row) throws IOException, InterruptedException {
+ Put put = new Put(row);
+ put.add(famName, row, row);
+
+ htable1 = new HTable(conf1, tableName);
+ htable1.put(put);
+
+ Get get = new Get(row);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES - 1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+
+ }
+
+}
Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java?rev=1509331&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java Thu Aug 1 17:03:28 2013
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(SmallTests.class)
+public class TestReplicationSinkManager {
+
+ private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
+
+ private ReplicationPeers replicationPeers;
+ private ReplicationSinkManager sinkManager;
+
+ @Before
+ public void setUp() {
+ replicationPeers = mock(ReplicationPeers.class);
+ sinkManager = new ReplicationSinkManager(mock(HConnection.class),
+ PEER_CLUSTER_ID, replicationPeers, new Configuration());
+ }
+
+ @Test
+ public void testChooseSinks() {
+ List<ServerName> serverNames = Lists.newArrayList();
+ for (int i = 0; i < 20; i++) {
+ serverNames.add(mock(ServerName.class));
+ }
+
+ when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ .thenReturn(serverNames);
+
+ sinkManager.chooseSinks();
+
+ assertEquals(2, sinkManager.getSinks().size());
+
+ }
+
+ @Test
+ public void testChooseSinks_LessThanRatioAvailable() {
+ List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
+ mock(ServerName.class));
+
+ when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ .thenReturn(serverNames);
+
+ sinkManager.chooseSinks();
+
+ assertEquals(1, sinkManager.getSinks().size());
+ }
+
+ @Test
+ public void testReportBadSink() {
+ ServerName serverNameA = mock(ServerName.class);
+ ServerName serverNameB = mock(ServerName.class);
+ when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
+ Lists.newArrayList(serverNameA, serverNameB));
+
+ sinkManager.chooseSinks();
+ // Sanity check
+ assertEquals(1, sinkManager.getSinks().size());
+
+ SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+
+ sinkManager.reportBadSink(sinkPeer);
+
+ // Just reporting a bad sink once shouldn't have an effect
+ assertEquals(1, sinkManager.getSinks().size());
+
+ }
+
+ /**
+ * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
+ * be replicated to anymore.
+ */
+ @Test
+ public void testReportBadSink_PastThreshold() {
+ List<ServerName> serverNames = Lists.newArrayList();
+ for (int i = 0; i < 20; i++) {
+ serverNames.add(mock(ServerName.class));
+ }
+ when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ .thenReturn(serverNames);
+
+
+ sinkManager.chooseSinks();
+ // Sanity check
+ assertEquals(2, sinkManager.getSinks().size());
+
+ ServerName serverName = sinkManager.getSinks().get(0);
+
+ SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+
+ for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+ sinkManager.reportBadSink(sinkPeer);
+ }
+
+ // Reporting a bad sink more than the threshold count should remove it
+ // from the list of potential sinks
+ assertEquals(1, sinkManager.getSinks().size());
+ }
+
+ @Test
+ public void testReportBadSink_DownToZeroSinks() {
+ List<ServerName> serverNames = Lists.newArrayList();
+ for (int i = 0; i < 20; i++) {
+ serverNames.add(mock(ServerName.class));
+ }
+ when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+ .thenReturn(serverNames);
+
+
+ sinkManager.chooseSinks();
+ // Sanity check
+
+ List<ServerName> sinkList = sinkManager.getSinks();
+ assertEquals(2, sinkList.size());
+
+ ServerName serverNameA = sinkList.get(0);
+ ServerName serverNameB = sinkList.get(1);
+
+ SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+ SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
+
+ for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+ sinkManager.reportBadSink(sinkPeerA);
+ sinkManager.reportBadSink(sinkPeerB);
+ }
+
+ // We've gone down to 0 good sinks, so the replication sinks
+ // should have been refreshed now
+ assertEquals(2, sinkManager.getSinks().size());
+ }
+
+}