You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/08/01 19:03:28 UTC

svn commit: r1509331 - in /hbase/branches/0.95: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/...

Author: jdcryans
Date: Thu Aug  1 17:03:28 2013
New Revision: 1509331

URL: http://svn.apache.org/r1509331
Log:
HBASE-7634 Replication handling of changes to peer clusters is inefficient (Gabriel Reid via JD)

Added:
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
    hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
Modified:
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
    hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
    hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Thu Aug  1 17:03:28 2013
@@ -18,6 +18,13 @@
  */
 package org.apache.hadoop.hbase.replication;
 
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience;
@@ -35,12 +42,6 @@ import org.apache.zookeeper.KeeperExcept
 
 import com.google.protobuf.InvalidProtocolBufferException;
 
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicBoolean;
-
 /**
  * This class acts as a wrapper for all the objects used to identify and
  * communicate with remote peers and is responsible for answering to expired
@@ -57,9 +58,11 @@ public class ReplicationPeer implements 
   // Cannot be final since a new object needs to be recreated when session fails
   private ZooKeeperWatcher zkw;
   private final Configuration conf;
+  private long lastRegionserverUpdate;
 
   private PeerStateTracker peerStateTracker;
 
+
   /**
    * Constructor that takes all the objects required to communicate with the
    * specified peer, except for the region server addresses.
@@ -130,6 +133,7 @@ public class ReplicationPeer implements 
    */
   public void setRegionServers(List<ServerName> regionServers) {
     this.regionServers = regionServers;
+    lastRegionserverUpdate = System.currentTimeMillis();
   }
 
   /**
@@ -141,6 +145,15 @@ public class ReplicationPeer implements 
   }
 
   /**
+   * Get the timestamp at which the last change occurred to the list of region servers to replicate
+   * to.
+   * @return The System.currentTimeMillis at the last time the list of peer region servers changed.
+   */
+  public long getLastRegionserverUpdate() {
+    return lastRegionserverUpdate;
+  }
+
+  /**
    * Get the identifier of this peer
    * @return string representation of the id (short)
    */

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java Thu Aug  1 17:03:28 2013
@@ -123,6 +123,14 @@ public interface ReplicationPeers {
   List<ServerName> getRegionServersOfConnectedPeer(String peerId);
 
   /**
+   * Get the timestamp of the last change in composition of a given peer cluster.
+   * @param peerId identifier of the peer cluster for which the timestamp is requested
+   * @return the timestamp (in milliseconds) of the last change to the composition of
+   *         the peer cluster
+   */
+  long getTimestampOfLastChangeToPeer(String peerId);
+
+  /**
    * Returns the UUID of the provided peer id.
    * @param peerId the peer's ID that will be converted into a UUID
    * @return a UUID or null if the peer cluster does not exist or is not connected.

Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Thu Aug  1 17:03:28 2013
@@ -39,6 +39,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
 import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -266,6 +267,14 @@ public class ReplicationPeersZKImpl exte
     return ids;
   }
 
+  @Override
+  public long getTimestampOfLastChangeToPeer(String peerId) {
+    if (!peerClusters.containsKey(peerId)) {
+      throw new IllegalArgumentException("Unknown peer id: " + peerId);
+    }
+    return peerClusters.get(peerId).getLastRegionserverUpdate();
+  }
+
   /**
    * A private method used during initialization. This method attempts to connect to all registered
    * peer clusters. This method does not set a watch on the peer cluster znodes.
@@ -291,6 +300,7 @@ public class ReplicationPeersZKImpl exte
       LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
       try {
         peer.reloadZkWatcher();
+        peer.getZkw().registerListener(new PeerRegionServerListener(peer));
       } catch (IOException io) {
         LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
       }
@@ -304,7 +314,7 @@ public class ReplicationPeersZKImpl exte
    */
   private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
       throws KeeperException {
-    List<String> children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
+    List<String> children = ZKUtil.listChildrenAndWatchForNewChildren(zkw, zkw.rsZNode);
     if (children == null) {
       return Collections.emptyList();
     }
@@ -315,6 +325,7 @@ public class ReplicationPeersZKImpl exte
     return addresses;
   }
 
+
   private String getPeerStateNode(String id) {
     return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
   }
@@ -366,6 +377,7 @@ public class ReplicationPeersZKImpl exte
     ReplicationPeer peer =
         new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
     peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+    peer.getZkw().registerListener(new PeerRegionServerListener(peer));
     return peer;
   }
 
@@ -406,4 +418,37 @@ public class ReplicationPeersZKImpl exte
             .toByteArray();
     return ProtobufUtil.prependPBMagic(bytes);
   }
+
+  /**
+   * Tracks changes to the list of region servers in a peer's cluster.
+   */
+  public static class PeerRegionServerListener extends ZooKeeperListener {
+
+    private ReplicationPeer peer;
+    private String regionServerListNode;
+
+    public PeerRegionServerListener(ReplicationPeer replicationPeer) {
+      super(replicationPeer.getZkw());
+      this.peer = replicationPeer;
+      this.regionServerListNode = peer.getZkw().rsZNode;
+    }
+
+    public PeerRegionServerListener(String regionServerListNode, ZooKeeperWatcher zkw) {
+      super(zkw);
+      this.regionServerListNode = regionServerListNode;
+    }
+
+    @Override
+    public synchronized void nodeChildrenChanged(String path) {
+      if (path.equals(regionServerListNode)) {
+        try {
+          LOG.info("Detected change to peer regionservers, fetching updated list");
+          peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
+        } catch (KeeperException e) {
+          LOG.fatal("Error reading slave addresses", e);
+        }
+      }
+    }
+
+  }
 }
\ No newline at end of file

Added: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java?rev=1509331&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java (added)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSinkManager.java Thu Aug  1 17:03:28 2013
@@ -0,0 +1,182 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+/**
+ * Maintains a collection of peers to replicate to, and randomly selects a
+ * single peer to replicate to per set of data to replicate. Also handles
+ * keeping track of peer availability.
+ */
+public class ReplicationSinkManager {
+
+  private static final Log LOG = LogFactory.getLog(ReplicationSinkManager.class);
+
+  /**
+   * Default maximum number of times a replication sink can be reported as bad before
+   * it will no longer be provided as a sink for replication without the pool of
+   * replication sinks being refreshed.
+   */
+  static final int DEFAULT_BAD_SINK_THRESHOLD = 3;
+
+  /**
+   * Default ratio of the total number of peer cluster region servers to consider
+   * replicating to.
+   */
+  static final float DEFAULT_REPLICATION_SOURCE_RATIO = 0.1f;
+
+
+  private final HConnection conn;
+
+  private final String peerClusterId;
+
+  private final ReplicationPeers replicationPeers;
+
+  // Count of "bad replication sink" reports per peer sink
+  private final Map<ServerName, Integer> badReportCounts;
+
+  // Ratio of total number of potential peer region servers to be used
+  private final float ratio;
+
+  // Maximum number of times a sink can be reported as bad before the pool of
+  // replication sinks is refreshed
+  private final int badSinkThreshold;
+
+  private final Random random;
+
+  // A timestamp of the last time the list of replication peers changed
+  private long lastUpdateToPeers;
+
+  // The current pool of sinks to which replication can be performed
+  private List<ServerName> sinks = Lists.newArrayList();
+
+  /**
+   * Instantiate for a single replication peer cluster.
+   * @param conn connection to the peer cluster
+   * @param peerClusterId identifier of the peer cluster
+   * @param replicationPeers manages peer clusters being replicated to
+   * @param conf HBase configuration, used for determining replication source ratio and bad peer
+   *          threshold
+   */
+  public ReplicationSinkManager(HConnection conn, String peerClusterId,
+      ReplicationPeers replicationPeers, Configuration conf) {
+    this.conn = conn;
+    this.peerClusterId = peerClusterId;
+    this.replicationPeers = replicationPeers;
+    this.badReportCounts = Maps.newHashMap();
+    this.ratio = conf.getFloat("replication.source.ratio", DEFAULT_REPLICATION_SOURCE_RATIO);
+    this.badSinkThreshold = conf.getInt("replication.bad.sink.threshold",
+                                        DEFAULT_BAD_SINK_THRESHOLD);
+    this.random = new Random();
+  }
+
+  /**
+   * Get a randomly-chosen replication sink to replicate to.
+   *
+   * @return a replication sink to replicate to
+   */
+  public SinkPeer getReplicationSink() throws IOException {
+    if (replicationPeers.getTimestampOfLastChangeToPeer(peerClusterId)
+                                                        > this.lastUpdateToPeers) {
+      LOG.info("Current list of sinks is out of date, updating");
+      chooseSinks();
+    }
+
+    if (sinks.isEmpty()) {
+      throw new IOException("No replication sinks are available");
+    }
+    ServerName serverName = sinks.get(random.nextInt(sinks.size()));
+    return new SinkPeer(serverName, conn.getAdmin(serverName));
+  }
+
+  /**
+   * Report a {@code SinkPeer} as being bad (i.e. an attempt to replicate to it
+   * failed). If a single SinkPeer is reported as bad more than
+   * replication.bad.sink.threshold times, it will be removed
+   * from the pool of potential replication targets.
+   *
+   * @param sinkPeer
+   *          The SinkPeer that had a failed replication attempt on it
+   */
+  public void reportBadSink(SinkPeer sinkPeer) {
+    ServerName serverName = sinkPeer.getServerName();
+    int badReportCount = (badReportCounts.containsKey(serverName)
+                    ? badReportCounts.get(serverName) : 0) + 1;
+    badReportCounts.put(serverName, badReportCount);
+    if (badReportCount > badSinkThreshold) {
+      this.sinks.remove(serverName);
+      if (sinks.isEmpty()) {
+        chooseSinks();
+      }
+    }
+  }
+
+  void chooseSinks() {
+    List<ServerName> slaveAddresses =
+                        replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
+    Collections.shuffle(slaveAddresses, random);
+    int numSinks = (int) Math.ceil(slaveAddresses.size() * ratio);
+    sinks = slaveAddresses.subList(0, numSinks);
+    lastUpdateToPeers = System.currentTimeMillis();
+    badReportCounts.clear();
+  }
+
+  List<ServerName> getSinks() {
+    return sinks;
+  }
+
+  /**
+   * Wraps a replication region server sink to provide the ability to identify
+   * it.
+   */
+  public static class SinkPeer {
+    private ServerName serverName;
+    private AdminService.BlockingInterface regionServer;
+
+    public SinkPeer(ServerName serverName, AdminService.BlockingInterface regionServer) {
+      this.serverName = serverName;
+      this.regionServer = regionServer;
+    }
+
+    ServerName getServerName() {
+      return serverName;
+    }
+
+    public AdminService.BlockingInterface getRegionServer() {
+      return regionServer;
+    }
+
+  }
+
+}

Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1509331&r1=1509330&r2=1509331&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Aug  1 17:03:28 2013
@@ -23,16 +23,11 @@ import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.ConnectException;
 import java.net.SocketTimeoutException;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
 import java.util.NavigableMap;
-import java.util.Random;
-import java.util.Set;
 import java.util.UUID;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -45,20 +40,19 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.Stoppable;
 import org.apache.hadoop.hbase.TableNotFoundException;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
 import org.apache.hadoop.hbase.protobuf.ReplicationProtbufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService.BlockingInterface;
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
 import org.apache.hadoop.ipc.RemoteException;
@@ -89,9 +83,6 @@ public class ReplicationSource extends T
   private ReplicationQueues replicationQueues;
   private ReplicationPeers replicationPeers;
   private Configuration conf;
-  // ratio of region servers to chose from a slave cluster
-  private float ratio;
-  private Random random;
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
@@ -99,8 +90,6 @@ public class ReplicationSource extends T
   private ReplicationSourceManager manager;
   // Should we stop everything?
   private Stoppable stopper;
-  // List of chosen sinks (region servers)
-  private List<ServerName> currentPeers;
   // How long should we sleep for each retry
   private long sleepForRetries;
   // Max size in bytes of entriesArray
@@ -140,6 +129,8 @@ public class ReplicationSource extends T
   private MetricsSource metrics;
   // Handle on the log reader helper
   private ReplicationHLogReaderManager repLogReader;
+  // Handles connecting to peer region servers
+  private ReplicationSinkManager replicationSinkMgr;
 
   /**
    * Instantiation method used by region servers
@@ -178,9 +169,6 @@ public class ReplicationSource extends T
     this.conn = HConnectionManager.getConnection(conf);
     this.replicationQueues = replicationQueues;
     this.replicationPeers = replicationPeers;
-    this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
-    this.currentPeers = new ArrayList<ServerName>();
-    this.random = new Random();
     this.manager = manager;
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
@@ -193,29 +181,9 @@ public class ReplicationSource extends T
     this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
     // ReplicationQueueInfo parses the peerId out of the znode for us
     this.peerId = this.replicationQueueInfo.getPeerId();
+    this.replicationSinkMgr = new ReplicationSinkManager(conn, peerId, replicationPeers, conf);
   }
 
-  /**
-   * Select a number of peers at random using the ratio. Mininum 1.
-   */
-  private void chooseSinks() {
-    this.currentPeers.clear();
-    List<ServerName> addresses = this.replicationPeers.getRegionServersOfConnectedPeer(this.peerId);
-    Set<ServerName> setOfAddr = new HashSet<ServerName>();
-    int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
-    LOG.debug("Getting " + nbPeers +
-        " rs from peer cluster # " + this.peerId);
-    for (int i = 0; i < nbPeers; i++) {
-      ServerName sn;
-      // Make sure we get one address that we don't already have
-      do {
-        sn = addresses.get(this.random.nextInt(addresses.size()));
-      } while (setOfAddr.contains(sn));
-      LOG.info("Choosing peer " + sn);
-      setOfAddr.add(sn);
-    }
-    this.currentPeers.addAll(setOfAddr);
-  }
 
   @Override
   public void enqueueLog(Path log) {
@@ -457,9 +425,9 @@ public class ReplicationSource extends T
     int sleepMultiplier = 1;
 
     // Connect to peer cluster first, unless we have to stop
-    while (this.isActive() && this.currentPeers.size() == 0) {
-      chooseSinks();
-      if (this.isActive() && this.currentPeers.size() == 0) {
+    while (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
+      replicationSinkMgr.chooseSinks();
+      if (this.isActive() && replicationSinkMgr.getSinks().size() == 0) {
         if (sleepForRetries("Waiting for peers", sleepMultiplier)) {
           sleepMultiplier++;
         }
@@ -583,7 +551,7 @@ public class ReplicationSource extends T
     return (this.repLogReader.getPosition() == 0 &&
         !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
   }
-  
+
   /**
    * Do the sleeping logic
    * @param msg Why we sleep
@@ -637,7 +605,7 @@ public class ReplicationSource extends T
 
   /**
    * Do the shipping logic
-   * @param currentWALisBeingWrittenTo was the current WAL being (seemingly) 
+   * @param currentWALisBeingWrittenTo was the current WAL being (seemingly)
    * written to when this method was called
    */
   protected void shipEdits(boolean currentWALisBeingWrittenTo) {
@@ -653,8 +621,10 @@ public class ReplicationSource extends T
         }
         continue;
       }
+      SinkPeer sinkPeer = null;
       try {
-        AdminService.BlockingInterface rrs = getRS();
+        sinkPeer = replicationSinkMgr.getReplicationSink();
+        BlockingInterface rrs = sinkPeer.getRegionServer();
         if (LOG.isTraceEnabled()) {
           LOG.trace("Replicating " + this.currentNbEntries + " entries");
         }
@@ -700,27 +670,17 @@ public class ReplicationSource extends T
               this.socketTimeoutMultiplier);
           } else if (ioe instanceof ConnectException) {
             LOG.warn("Peer is unavailable, rechecking all sinks: ", ioe);
-            chooseSinks();
+            replicationSinkMgr.chooseSinks();
           } else {
             LOG.warn("Can't replicate because of a local or network error: ", ioe);
           }
         }
 
-        try {
-          boolean down;
-          // Spin while the slave is down and we're not asked to shutdown/close
-          do {
-            down = isSlaveDown();
-            if (down) {
-              if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
-                sleepMultiplier++;
-              } else {
-                chooseSinks();
-              }
-            }
-          } while (this.isActive() && down );
-        } catch (InterruptedException e) {
-          LOG.debug("Interrupted while trying to contact the peer cluster");
+        if (sinkPeer != null) {
+          replicationSinkMgr.reportBadSink(sinkPeer);
+        }
+        if (sleepForRetries("Since we are unable to replicate", sleepMultiplier)) {
+          sleepMultiplier++;
         }
       }
     }
@@ -797,49 +757,6 @@ public class ReplicationSource extends T
     Threads.shutdown(this, this.sleepForRetries);
   }
 
-  /**
-   * Get a new region server at random from this peer
-   * @return
-   * @throws IOException
-   */
-  private AdminService.BlockingInterface getRS() throws IOException {
-    if (this.currentPeers.size() == 0) {
-      throw new IOException(this.peerClusterZnode + " has 0 region servers");
-    }
-    ServerName address =
-        currentPeers.get(random.nextInt(this.currentPeers.size()));
-    return this.conn.getAdmin(address);
-  }
-
-  /**
-   * Check if the slave is down by trying to establish a connection
-   * @return true if down, false if up
-   * @throws InterruptedException
-   */
-  public boolean isSlaveDown() throws InterruptedException {
-    final CountDownLatch latch = new CountDownLatch(1);
-    Thread pingThread = new Thread() {
-      public void run() {
-        try {
-          AdminService.BlockingInterface rrs = getRS();
-          // Dummy call which should fail
-          ProtobufUtil.getServerInfo(rrs);
-          latch.countDown();
-        } catch (IOException ex) {
-          if (ex instanceof RemoteException) {
-            ex = ((RemoteException) ex).unwrapRemoteException();
-          }
-          LOG.info("Slave cluster looks down: " + ex.getMessage(), ex);
-        }
-      }
-    };
-    pingThread.start();
-    // awaits returns true if countDown happened
-    boolean down = ! latch.await(this.sleepForRetries, TimeUnit.MILLISECONDS);
-    pingThread.interrupt();
-    return down;
-  }
-
   public String getPeerClusterZnode() {
     return this.peerClusterZnode;
   }

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java?rev=1509331&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationChangingPeerRegionservers.java Thu Aug  1 17:03:28 2013
@@ -0,0 +1,142 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.JVMClusterUtil.RegionServerThread;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ * Test handling of changes to the number of a peer's regionservers.
+ */
+@Category(LargeTests.class)
+public class TestReplicationChangingPeerRegionservers extends TestReplicationBase {
+
+  private static final Log LOG = LogFactory.getLog(TestReplicationChangingPeerRegionservers.class);
+
+  /**
+   * @throws java.lang.Exception
+   */
+  @Before
+  public void setUp() throws Exception {
+    htable1.setAutoFlush(true);
+    // Starting and stopping replication can make us miss new logs,
+    // rolling like this makes sure the most recent one gets added to the queue
+    for (JVMClusterUtil.RegionServerThread r :
+                          utility1.getHBaseCluster().getRegionServerThreads()) {
+      r.getRegionServer().getWAL().rollWriter();
+    }
+    utility1.truncateTable(tableName);
+    // truncating the table will send one Delete per row to the slave cluster
+    // in an async fashion, which is why we cannot just call truncateTable on
+    // utility2 since late writes could make it to the slave in some way.
+    // Instead, we truncate the first table and wait for all the Deletes to
+    // make it to the slave.
+    Scan scan = new Scan();
+    int lastCount = 0;
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for truncate");
+      }
+      ResultScanner scanner = htable2.getScanner(scan);
+      Result[] res = scanner.next(NB_ROWS_IN_BIG_BATCH);
+      scanner.close();
+      if (res.length != 0) {
+        if (res.length < lastCount) {
+          i--; // Don't increment timeout if we make progress
+        }
+        lastCount = res.length;
+        LOG.info("Still got " + res.length + " rows");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        break;
+      }
+    }
+  }
+
+  @Test(timeout = 300000)
+  public void testChangingNumberOfPeerRegionServers() throws IOException, InterruptedException {
+
+    LOG.info("testSimplePutDelete");
+    MiniHBaseCluster peerCluster = utility2.getMiniHBaseCluster();
+
+    doPutTest(Bytes.toBytes(1));
+
+    int rsToStop = peerCluster.getServerWithMeta() == 0 ? 1 : 0;
+    peerCluster.stopRegionServer(rsToStop);
+    peerCluster.waitOnRegionServer(rsToStop);
+
+    // Sanity check
+    assertEquals(1, peerCluster.getRegionServerThreads().size());
+
+    doPutTest(Bytes.toBytes(2));
+
+    peerCluster.startRegionServer();
+
+    // Sanity check
+    assertEquals(2, peerCluster.getRegionServerThreads().size());
+
+    doPutTest(Bytes.toBytes(3));
+
+  }
+
+  private void doPutTest(byte[] row) throws IOException, InterruptedException {
+    Put put = new Put(row);
+    put.add(famName, row, row);
+
+    htable1 = new HTable(conf1, tableName);
+    htable1.put(put);
+
+    Get get = new Get(row);
+    for (int i = 0; i < NB_RETRIES; i++) {
+      if (i == NB_RETRIES - 1) {
+        fail("Waited too much time for put replication");
+      }
+      Result res = htable2.get(get);
+      if (res.size() == 0) {
+        LOG.info("Row not available");
+        Thread.sleep(SLEEP_TIME);
+      } else {
+        assertArrayEquals(res.value(), row);
+        break;
+      }
+    }
+
+  }
+
+}

Added: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java?rev=1509331&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java (added)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSinkManager.java Thu Aug  1 17:03:28 2013
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.SmallTests;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.protobuf.generated.AdminProtos.AdminService;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSinkManager.SinkPeer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import com.google.common.collect.Lists;
+
+@Category(SmallTests.class)
+public class TestReplicationSinkManager {
+
+  private static final String PEER_CLUSTER_ID = "PEER_CLUSTER_ID";
+
+  private ReplicationPeers replicationPeers;
+  private ReplicationSinkManager sinkManager;
+
+  @Before
+  public void setUp() {
+    replicationPeers = mock(ReplicationPeers.class);
+    sinkManager = new ReplicationSinkManager(mock(HConnection.class),
+                      PEER_CLUSTER_ID, replicationPeers, new Configuration());
+  }
+
+  @Test
+  public void testChooseSinks() {
+    List<ServerName> serverNames = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      serverNames.add(mock(ServerName.class));
+    }
+
+    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+          .thenReturn(serverNames);
+
+    sinkManager.chooseSinks();
+
+    assertEquals(2, sinkManager.getSinks().size());
+
+  }
+
+  @Test
+  public void testChooseSinks_LessThanRatioAvailable() {
+    List<ServerName> serverNames = Lists.newArrayList(mock(ServerName.class),
+      mock(ServerName.class));
+
+    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+          .thenReturn(serverNames);
+
+    sinkManager.chooseSinks();
+
+    assertEquals(1, sinkManager.getSinks().size());
+  }
+
+  @Test
+  public void testReportBadSink() {
+    ServerName serverNameA = mock(ServerName.class);
+    ServerName serverNameB = mock(ServerName.class);
+    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID)).thenReturn(
+      Lists.newArrayList(serverNameA, serverNameB));
+
+    sinkManager.chooseSinks();
+    // Sanity check
+    assertEquals(1, sinkManager.getSinks().size());
+
+    SinkPeer sinkPeer = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+
+    sinkManager.reportBadSink(sinkPeer);
+
+    // Just reporting a bad sink once shouldn't have an effect
+    assertEquals(1, sinkManager.getSinks().size());
+
+  }
+
+  /**
+   * Once a SinkPeer has been reported as bad more than BAD_SINK_THRESHOLD times, it should not
+   * be replicated to anymore.
+   */
+  @Test
+  public void testReportBadSink_PastThreshold() {
+    List<ServerName> serverNames = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      serverNames.add(mock(ServerName.class));
+    }
+    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+          .thenReturn(serverNames);
+
+
+    sinkManager.chooseSinks();
+    // Sanity check
+    assertEquals(2, sinkManager.getSinks().size());
+
+    ServerName serverName = sinkManager.getSinks().get(0);
+
+    SinkPeer sinkPeer = new SinkPeer(serverName, mock(AdminService.BlockingInterface.class));
+
+    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+      sinkManager.reportBadSink(sinkPeer);
+    }
+
+    // Reporting a bad sink more than the threshold count should remove it
+    // from the list of potential sinks
+    assertEquals(1, sinkManager.getSinks().size());
+  }
+
+  @Test
+  public void testReportBadSink_DownToZeroSinks() {
+    List<ServerName> serverNames = Lists.newArrayList();
+    for (int i = 0; i < 20; i++) {
+      serverNames.add(mock(ServerName.class));
+    }
+    when(replicationPeers.getRegionServersOfConnectedPeer(PEER_CLUSTER_ID))
+          .thenReturn(serverNames);
+
+
+    sinkManager.chooseSinks();
+    // Sanity check
+
+    List<ServerName> sinkList = sinkManager.getSinks();
+    assertEquals(2, sinkList.size());
+
+    ServerName serverNameA = sinkList.get(0);
+    ServerName serverNameB = sinkList.get(1);
+
+    SinkPeer sinkPeerA = new SinkPeer(serverNameA, mock(AdminService.BlockingInterface.class));
+    SinkPeer sinkPeerB = new SinkPeer(serverNameB, mock(AdminService.BlockingInterface.class));
+
+    for (int i = 0; i <= ReplicationSinkManager.DEFAULT_BAD_SINK_THRESHOLD; i++) {
+      sinkManager.reportBadSink(sinkPeerA);
+      sinkManager.reportBadSink(sinkPeerB);
+    }
+
+    // We've gone down to 0 good sinks, so the replication sinks
+    // should have been refreshed now
+    assertEquals(2, sinkManager.getSinks().size());
+  }
+
+}