You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/05/16 02:56:36 UTC

svn commit: r1483136 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ hbase-s...

Author: jdcryans
Date: Thu May 16 00:56:35 2013
New Revision: 1483136

URL: http://svn.apache.org/r1483136
Log:
HBASE-7380  [replication] When transferring queues, check if the peer still
            exists before copying the znodes

Added:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java?rev=1483136&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java Thu May 16 00:56:35 2013
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ServerName;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * This class is responsible for the parsing logic for a znode representing a queue.
+ * It will extract the peerId if it's recovered as well as the dead region servers
+ * that were part of the queue's history.
+ */
+public class ReplicationQueueInfo {
+  private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class);
+
+  private final String peerId;
+  private final String peerClusterZnode;
+  private boolean queueRecovered;
+  // List of all the dead region servers that had this queue (if recovered)
+  private List<String> deadRegionServers = new ArrayList<String>();
+
+  /**
+   * The passed znode will be either the id of the peer cluster or
+   * the handling story of that queue in the form of id-servername-*
+   */
+  public ReplicationQueueInfo(String znode) {
+    this.peerClusterZnode = znode;
+    String[] parts = znode.split("-", 2);
+    this.queueRecovered = parts.length != 1;
+    this.peerId = this.queueRecovered ?
+        parts[0] : peerClusterZnode;
+    if (parts.length >= 2) {
+      // extract dead servers
+      extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
+    }
+  }
+
+  /**
+   * Parse dead server names from znode string servername can contain "-" such as
+   * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
+   * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
+   */
+  private static void
+      extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
+
+    if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
+
+    // valid server name delimiter "-" has to be after "," in a server name
+    int seenCommaCnt = 0;
+    int startIndex = 0;
+    int len = deadServerListStr.length();
+
+    for (int i = 0; i < len; i++) {
+      switch (deadServerListStr.charAt(i)) {
+      case ',':
+        seenCommaCnt += 1;
+        break;
+      case '-':
+        if(seenCommaCnt>=2) {
+          if (i > startIndex) {
+            String serverName = deadServerListStr.substring(startIndex, i);
+            if(ServerName.isFullServerName(serverName)){
+              result.add(serverName);
+            } else {
+              LOG.error("Found invalid server name:" + serverName);
+            }
+            startIndex = i + 1;
+          }
+          seenCommaCnt = 0;
+        }
+        break;
+      default:
+        break;
+      }
+    }
+
+    // add tail
+    if(startIndex < len - 1){
+      String serverName = deadServerListStr.substring(startIndex, len);
+      if(ServerName.isFullServerName(serverName)){
+        result.add(serverName);
+      } else {
+        LOG.error("Found invalid server name at the end:" + serverName);
+      }
+    }
+
+    LOG.debug("Found dead servers:" + result);
+  }
+
+  public List<String> getDeadRegionServers() {
+    return Collections.unmodifiableList(this.deadRegionServers);
+  }
+
+  public String getPeerId() {
+    return this.peerId;
+  }
+
+  public String getPeerClusterZnode() {
+    return this.peerClusterZnode;
+  }
+
+  public boolean isQueueRecovered() {
+    return queueRecovered;
+  }
+}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java Thu May 16 00:56:35 2013
@@ -258,6 +258,12 @@ public class ReplicationQueuesZKImpl ext
       peerIdsToProcess = ZKUtil.listChildrenNoWatch(this.zookeeper, deadRSZnodePath);
       if (peerIdsToProcess == null) return queues; // node already processed
       for (String peerId : peerIdsToProcess) {
+        ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(peerId);
+        if (!peerExists(replicationQueueInfo.getPeerId())) {
+          LOG.warn("Peer " + peerId + " didn't exist, skipping the replay");
+          // Protection against moving orphaned queues
+          continue;
+        }
         String newPeerId = peerId + "-" + znode;
         String newPeerZnode = ZKUtil.joinZNode(this.myQueuesZnode, newPeerId);
         // check the logs queue for the old peer cluster
@@ -319,6 +325,12 @@ public class ReplicationQueuesZKImpl ext
       // The lock isn't a peer cluster, remove it
       clusters.remove(RS_LOCK_ZNODE);
       for (String cluster : clusters) {
+        ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(cluster);
+        if (!peerExists(replicationQueueInfo.getPeerId())) {
+          LOG.warn("Peer " + cluster + " didn't exist, skipping the replay");
+          // Protection against moving orphaned queues
+          continue;
+        }
         // We add the name of the recovered RS to the new znode, we can even
         // do that for queues that were recovered 10 times giving a znode like
         // number-startcode-number-otherstartcode-number-anotherstartcode-etc

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java Thu May 16 00:56:35 2013
@@ -78,4 +78,9 @@ public abstract class ReplicationStateZK
     }
     return result;
   }
+
+  public boolean peerExists(String id) throws KeeperException {
+    return ZKUtil.checkExists(this.zookeeper,
+        ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+  }
 }
\ No newline at end of file

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Thu May 16 00:56:35 2013
@@ -82,7 +82,7 @@ import java.util.concurrent.atomic.Atomi
  * </pre>
  */
 @InterfaceAudience.Private
-public class ReplicationZookeeper implements Closeable {
+public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
   private static final Log LOG =
     LogFactory.getLog(ReplicationZookeeper.class);
 
@@ -132,6 +132,7 @@ public class ReplicationZookeeper implem
    */
   public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
       final ZooKeeperWatcher zk) throws KeeperException {
+    super(zk, conf, abortable);
     this.conf = conf;
     this.zookeeper = zk;
     setZNodes(abortable);
@@ -151,6 +152,7 @@ public class ReplicationZookeeper implem
    */
   public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
   throws IOException, KeeperException {
+    super(server.getZooKeeper(), server.getConfiguration(), server);
     this.abortable = server;
     this.zookeeper = server.getZooKeeper();
     this.conf = server.getConfiguration();
@@ -536,11 +538,6 @@ public class ReplicationZookeeper implem
     }
   }
 
-  private boolean peerExists(String id) throws KeeperException {
-    return ZKUtil.checkExists(this.zookeeper,
-          ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
-  }
-
   /**
    * Enable replication to the peer
    *

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu May 16 00:56:35 2013
@@ -25,7 +25,6 @@ import java.net.ConnectException;
 import java.net.SocketTimeoutException;
 import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashSet;
 import java.util.List;
@@ -58,6 +57,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Threads;
@@ -94,6 +94,7 @@ public class ReplicationSource extends T
   private Random random;
   // should we replicate or not?
   private AtomicBoolean replicating;
+  private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
   // The manager of all sources to which we ping back our progress
@@ -123,10 +124,6 @@ public class ReplicationSource extends T
   private long totalReplicatedEdits = 0;
   // The znode we currently play with
   private String peerClusterZnode;
-  // Indicates if this queue is recovered (and will be deleted when depleted)
-  private boolean queueRecovered;
-  // List of all the dead region servers that had this queue (if recovered)
-  private List<String> deadRegionServers = new ArrayList<String>();
   // Maximum number of retries before taking bold actions
   private int maxRetriesMultiplier;
   // Socket timeouts require even bolder actions since we don't want to DDOS
@@ -196,100 +193,22 @@ public class ReplicationSource extends T
     } catch (KeeperException ke) {
       throw new IOException("Could not read cluster id", ke);
     }
-
-    // Finally look if this is a recovered queue
-    this.checkIfQueueRecovered(peerClusterZnode);
-  }
-
-  // The passed znode will be either the id of the peer cluster or
-  // the handling story of that queue in the form of id-servername-*
-  //
-  // package access for testing
-  void checkIfQueueRecovered(String peerClusterZnode) {
-    String[] parts = peerClusterZnode.split("-", 2);
-    this.queueRecovered = parts.length != 1;
-    this.peerId = this.queueRecovered ?
-        parts[0] : peerClusterZnode;
     this.peerClusterZnode = peerClusterZnode;
-
-    if (parts.length < 2) {
-      // not queue recovered situation
-      return;
-    }
-
-    // extract dead servers
-    extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
+    this.replicationQueueInfo = new ReplicationQueueInfo(peerClusterZnode);
+    // ReplicationQueueInfo parses the peerId out of the znode for us
+    this.peerId = this.replicationQueueInfo.getPeerId();
   }
-  
-  /**
-   * for tests only
-   */
-  List<String> getDeadRegionServers() {
-    return Collections.unmodifiableList(this.deadRegionServers);
-  }
-
-  /**
-   * Parse dead server names from znode string servername can contain "-" such as
-   * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
-   * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
-   */
-  private static void
-      extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
-    
-    if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
-
-    // valid server name delimiter "-" has to be after "," in a server name
-    int seenCommaCnt = 0;
-    int startIndex = 0;
-    int len = deadServerListStr.length();
 
-    for (int i = 0; i < len; i++) {
-      switch (deadServerListStr.charAt(i)) {
-      case ',':
-        seenCommaCnt += 1;
-        break;
-      case '-':
-        if(seenCommaCnt>=2) {
-          if (i > startIndex) {
-            String serverName = deadServerListStr.substring(startIndex, i);
-            if(ServerName.isFullServerName(serverName)){
-              result.add(serverName);
-            } else {
-              LOG.error("Found invalid server name:" + serverName);
-            }
-            startIndex = i + 1;
-          }
-          seenCommaCnt = 0;
-        }
-        break;
-      default:
-        break;
-      }
-    }
-
-    // add tail
-    if(startIndex < len - 1){
-      String serverName = deadServerListStr.substring(startIndex, len);
-      if(ServerName.isFullServerName(serverName)){
-        result.add(serverName);
-      } else {
-        LOG.error("Found invalid server name at the end:" + serverName);
-      }
-    }
-
-    LOG.debug("Found dead servers:" + result);
-  }
-  
   /**
    * Select a number of peers at random using the ratio. Mininum 1.
    */
   private void chooseSinks() {
     this.currentPeers.clear();
-    List<ServerName> addresses = this.zkHelper.getSlavesAddresses(peerId);
+    List<ServerName> addresses = this.zkHelper.getSlavesAddresses(this.peerId);
     Set<ServerName> setOfAddr = new HashSet<ServerName>();
     int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
     LOG.info("Getting " + nbPeers +
-        " rs from peer cluster # " + peerId);
+        " rs from peer cluster # " + this.peerId);
     for (int i = 0; i < nbPeers; i++) {
       ServerName sn;
       // Make sure we get one address that we don't already have
@@ -333,13 +252,13 @@ public class ReplicationSource extends T
 
     // If this is recovered, the queue is already full and the first log
     // normally has a position (unless the RS failed between 2 logs)
-    if (this.queueRecovered) {
+    if (this.replicationQueueInfo.isQueueRecovered()) {
       try {
         this.repLogReader.setPosition(this.zkHelper.getHLogRepPosition(
             this.peerClusterZnode, this.queue.peek().getName()));
       } catch (KeeperException e) {
         this.terminate("Couldn't get the position of this recovered queue " +
-            peerClusterZnode, e);
+            this.peerClusterZnode, e);
       }
     }
     // Loop until we close down
@@ -374,7 +293,7 @@ public class ReplicationSource extends T
       //We take the snapshot now so that we are protected against races
       //where a new file gets enqueued while the current file is being processed
       //(and where we just finished reading the current file).
-      if (!this.queueRecovered && queue.size() == 0) {
+      if (!this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0) {
         currentWALisBeingWrittenTo = true;
       }
       // Open a reader on it
@@ -401,24 +320,24 @@ public class ReplicationSource extends T
           continue;
         }
       } catch (IOException ioe) {
-        LOG.warn(peerClusterZnode + " Got: ", ioe);
+        LOG.warn(this.peerClusterZnode + " Got: ", ioe);
         gotIOE = true;
         if (ioe.getCause() instanceof EOFException) {
 
           boolean considerDumping = false;
-          if (this.queueRecovered) {
+          if (this.replicationQueueInfo.isQueueRecovered()) {
             try {
               FileStatus stat = this.fs.getFileStatus(this.currentPath);
               if (stat.getLen() == 0) {
-                LOG.warn(peerClusterZnode + " Got EOF and the file was empty");
+                LOG.warn(this.peerClusterZnode + " Got EOF and the file was empty");
               }
               considerDumping = true;
             } catch (IOException e) {
-              LOG.warn(peerClusterZnode + " Got while getting file size: ", e);
+              LOG.warn(this.peerClusterZnode + " Got while getting file size: ", e);
             }
           } else if (currentNbEntries != 0) {
-            LOG.warn(peerClusterZnode + " Got EOF while reading, " +
-                "looks like this file is broken? " + currentPath);
+            LOG.warn(this.peerClusterZnode +
+                " Got EOF while reading, " + "looks like this file is broken? " + currentPath);
             considerDumping = true;
             currentNbEntries = 0;
           }
@@ -446,7 +365,7 @@ public class ReplicationSource extends T
         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
               this.peerClusterZnode, this.repLogReader.getPosition(),
-              queueRecovered, currentWALisBeingWrittenTo);
+              this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
           this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         if (sleepForRetries("Nothing to replicate", sleepMultiplier)) {
@@ -465,7 +384,7 @@ public class ReplicationSource extends T
         LOG.debug("Attempt to close connection failed", e);
       }
     }
-    LOG.debug("Source exiting " + peerId);
+    LOG.debug("Source exiting " + this.peerId);
     metrics.clear();
   }
 
@@ -571,10 +490,11 @@ public class ReplicationSource extends T
       try {
         this.reader = repLogReader.openReader(this.currentPath);
       } catch (FileNotFoundException fnfe) {
-        if (this.queueRecovered) {
+        if (this.replicationQueueInfo.isQueueRecovered()) {
           // We didn't find the log in the archive directory, look if it still
           // exists in the dead RS folder (there could be a chain of failures
           // to look at)
+          List<String> deadRegionServers = this.replicationQueueInfo.getDeadRegionServers();
           LOG.info("NB dead servers : " + deadRegionServers.size());
           for (String curDeadServerName : deadRegionServers) {
             Path deadRsDirectory =
@@ -621,7 +541,7 @@ public class ReplicationSource extends T
       }
     } catch (IOException ioe) {
       if (ioe instanceof EOFException && isCurrentLogEmpty()) return true;
-      LOG.warn(peerClusterZnode + " Got: ", ioe);
+      LOG.warn(this.peerClusterZnode + " Got: ", ioe);
       this.reader = null;
       if (ioe.getCause() instanceof NullPointerException) {
         // Workaround for race condition in HDFS-4380
@@ -645,7 +565,8 @@ public class ReplicationSource extends T
    * may be empty, and we don't want to retry that.
    */
   private boolean isCurrentLogEmpty() {
-    return (this.repLogReader.getPosition() == 0 && !queueRecovered && queue.size() == 0);
+    return (this.repLogReader.getPosition() == 0 &&
+        !this.replicationQueueInfo.isQueueRecovered() && queue.size() == 0);
   }
   
   /**
@@ -724,7 +645,7 @@ public class ReplicationSource extends T
         if (this.lastLoggedPosition != this.repLogReader.getPosition()) {
           this.manager.logPositionAndCleanOldLogs(this.currentPath,
               this.peerClusterZnode, this.repLogReader.getPosition(),
-              queueRecovered, currentWALisBeingWrittenTo);
+              this.replicationQueueInfo.isQueueRecovered(), currentWALisBeingWrittenTo);
           this.lastLoggedPosition = this.repLogReader.getPosition();
         }
         this.totalReplicatedEdits += currentNbEntries;
@@ -788,7 +709,8 @@ public class ReplicationSource extends T
    * @return true if the peer is enabled, otherwise false
    */
   protected boolean isPeerEnabled() {
-    return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId);
+    return this.replicating.get() &&
+        this.zkHelper.getPeerEnabled(this.peerId);
   }
 
   /**
@@ -804,7 +726,7 @@ public class ReplicationSource extends T
       this.repLogReader.finishCurrentFile();
       this.reader = null;
       return true;
-    } else if (this.queueRecovered) {
+    } else if (this.replicationQueueInfo.isQueueRecovered()) {
       this.manager.closeRecoveredQueue(this);
       LOG.info("Finished recovering the queue");
       this.running = false;
@@ -823,7 +745,8 @@ public class ReplicationSource extends T
           }
         };
     Threads.setDaemonThreadRunning(
-        this, n + ".replicationSource," + peerClusterZnode, handler);
+        this, n + ".replicationSource," +
+        this.peerClusterZnode, handler);
   }
 
   public void terminate(String reason) {

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java Thu May 16 00:56:35 2013
@@ -45,6 +45,7 @@ public class TestReplicationStateZKImpl 
   private static Configuration conf;
   private static HBaseTestingUtility utility;
   private static ZooKeeperWatcher zkw;
+  private static String replicationZNode;
 
   @BeforeClass
   public static void setUpBeforeClass() throws Exception {
@@ -52,6 +53,8 @@ public class TestReplicationStateZKImpl 
     utility.startMiniZKCluster();
     conf = utility.getConfiguration();
     zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
+    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+    replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
   }
 
   @Before
@@ -63,12 +66,14 @@ public class TestReplicationStateZKImpl 
     rq2 = new ReplicationQueuesZKImpl(zkw, conf, ds2);
     rq3 = new ReplicationQueuesZKImpl(zkw, conf, ds3);
     rqc = new ReplicationQueuesClientZKImpl(zkw, conf, ds1);
+    String peersZnode = ZKUtil.joinZNode(replicationZNode, "peers");
+    for (int i = 1; i < 6; i++) {
+      ZKUtil.createWithParents(zkw, ZKUtil.joinZNode(peersZnode, "qId"+i));
+    }
   }
 
   @After
   public void tearDown() throws KeeperException, IOException {
-    String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
-    String replicationZNode = ZKUtil.joinZNode(zkw.baseZNode, replicationZNodeName);
     ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
   }
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1483136&r1=1483135&r2=1483136&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Thu May 16 00:56:35 2013
@@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.util.Bytes;
@@ -293,9 +294,8 @@ public class TestReplicationSourceManage
     testMap = rz3.claimQueues(s2.getServerName().getServerName());
     rz3.close();
 
-    ReplicationSource s = new ReplicationSource();
-    s.checkIfQueueRecovered(testMap.firstKey());
-    List<String> result = s.getDeadRegionServers();
+    ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
+    List<String> result = replicationQueueInfo.getDeadRegionServers();
 
     // verify
     assertTrue(result.contains(server.getServerName().getServerName()));