You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/08/22 07:42:14 UTC

svn commit: r1516367 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/ hba...

Author: stack
Date: Thu Aug 22 05:42:14 2013
New Revision: 1516367

URL: http://svn.apache.org/r1516367
Log:
HBASE-8441 [replication] Refactor KeeperExceptions thrown from replication state interfaces into replication specific exceptions

Added:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Thu Aug 22 05:42:14 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Abortable
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
@@ -83,7 +84,7 @@ public class ReplicationAdmin implements
   /**
    * Constructor that creates a connection to the local ZooKeeper ensemble.
    * @param conf Configuration to use
-   * @throws IOException if the connection to ZK cannot be made
+   * @throws IOException if an internal replication error occurs
    * @throws RuntimeException if replication isn't enabled.
    */
   public ReplicationAdmin(Configuration conf) throws IOException {
@@ -100,8 +101,8 @@ public class ReplicationAdmin implements
           ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
       this.replicationQueuesClient.init();
 
-    } catch (KeeperException e) {
-      throw new IOException("Unable setup the ZooKeeper connection", e);
+    } catch (ReplicationException e) {
+      throw new IOException("Error initializing the replication admin client.", e);
     }
   }
 
@@ -131,7 +132,7 @@ public class ReplicationAdmin implements
    * @throws IllegalStateException if there's already one slave since
    * multi-slave isn't supported yet.
    */
-  public void addPeer(String id, String clusterKey) throws IOException {
+  public void addPeer(String id, String clusterKey) throws ReplicationException {
     this.replicationPeers.addPeer(id, clusterKey);
   }
 
@@ -139,7 +140,7 @@ public class ReplicationAdmin implements
    * Removes a peer cluster and stops the replication to it.
    * @param id a short that identifies the cluster
    */
-  public void removePeer(String id) throws IOException {
+  public void removePeer(String id) throws ReplicationException {
     this.replicationPeers.removePeer(id);
   }
 
@@ -147,7 +148,7 @@ public class ReplicationAdmin implements
    * Restart the replication stream to the specified peer.
    * @param id a short that identifies the cluster
    */
-  public void enablePeer(String id) throws IOException {
+  public void enablePeer(String id) throws ReplicationException {
     this.replicationPeers.enablePeer(id);
   }
 
@@ -155,7 +156,7 @@ public class ReplicationAdmin implements
    * Stop the replication stream to the specified peer.
    * @param id a short that identifies the cluster
    */
-  public void disablePeer(String id) throws IOException {
+  public void disablePeer(String id) throws ReplicationException {
     this.replicationPeers.disablePeer(id);
   }
 
@@ -181,7 +182,7 @@ public class ReplicationAdmin implements
    *           is thrown if it doesn't exist
    * @return true if replication is enabled to that peer, false if it isn't
    */
-  public boolean getPeerState(String id) throws IOException {
+  public boolean getPeerState(String id) throws ReplicationException {
     return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
   }
 

Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java?rev=1516367&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java Thu Aug 22 05:42:14 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.HBaseException;
+
+/**
+ * An HBase Replication exception. This exception is client facing and is thrown from all
+ * replication interfaces that deal with the manipulation of replication state. This exception could
+ * be thrown for a number of different reasons including loss of connection to the underlying data
+ * store, loss of connection to a peer cluster or errors during deserialization of replication data.
+ */
+@InterfaceAudience.Public
+public class ReplicationException extends HBaseException {
+
+  private static final long serialVersionUID = -8885598603988198062L;
+
+  public ReplicationException() {
+    super();
+  }
+
+  public ReplicationException(final String message) {
+    super(message);
+  }
+
+  public ReplicationException(final String message, final Throwable t) {
+    super(message, t);
+  }
+
+  public ReplicationException(final Throwable t) {
+    super(t);
+  }
+}

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Thu Aug 22 05:42:14 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replicat
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -70,12 +69,15 @@ public class ReplicationPeer implements 
    * @param key cluster key used to locate the peer
    * @param id string representation of this peer's identifier
    */
-  public ReplicationPeer(Configuration conf, String key,
-      String id) throws IOException {
+  public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
     this.conf = conf;
     this.clusterKey = key;
     this.id = id;
-    this.reloadZkWatcher();
+    try {
+      this.reloadZkWatcher();
+    } catch (IOException e) {
+      throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
+    }
   }
 
   /**

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java Thu Aug 22 05:42:14 2013
@@ -18,7 +18,6 @@
  */
 package org.apache.hadoop.hbase.replication;
 
-import java.io.IOException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -27,7 +26,6 @@ import java.util.UUID;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.ServerName;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
@@ -44,9 +42,8 @@ public interface ReplicationPeers {
 
   /**
    * Initialize the ReplicationPeers interface.
-   * @throws KeeperException
    */
-  void init() throws IOException, KeeperException;
+  void init() throws ReplicationException;
 
   /**
    * Add a new remote slave cluster for replication.
@@ -54,25 +51,25 @@ public interface ReplicationPeers {
    * @param clusterKey the concatenation of the slave cluster's:
    *          hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
    */
-  void addPeer(String peerId, String clusterKey) throws IOException;
+  void addPeer(String peerId, String clusterKey) throws ReplicationException;
 
   /**
    * Removes a remote slave cluster and stops the replication to it.
    * @param peerId a short that identifies the cluster
    */
-  void removePeer(String peerId) throws IOException;
+  void removePeer(String peerId) throws ReplicationException;
 
   /**
    * Restart the replication to the specified remote slave cluster.
    * @param peerId a short that identifies the cluster
    */
-  void enablePeer(String peerId) throws IOException;
+  void enablePeer(String peerId) throws ReplicationException;
 
   /**
    * Stop the replication to the specified remote slave cluster.
    * @param peerId a short that identifies the cluster
    */
-  void disablePeer(String peerId) throws IOException;
+  void disablePeer(String peerId) throws ReplicationException;
 
   /**
    * Get the replication status for the specified connected remote slave cluster.
@@ -91,7 +88,7 @@ public interface ReplicationPeers {
    * @return true if replication is enabled, false otherwise.
    * @throws IOException Throws if there's an error contacting the store
    */
-  boolean getStatusOfPeerFromBackingStore(String peerId) throws IOException;
+  boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
 
   /**
    * Get a set of all connected remote slave clusters.
@@ -118,7 +115,7 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @return true if a new connection was made, false if no new connection was made.
    */
-  boolean connectToPeer(String peerId) throws IOException, KeeperException;
+  boolean connectToPeer(String peerId) throws ReplicationException;
 
   /**
    * Disconnect from a remote slave cluster.
@@ -154,5 +151,5 @@ public interface ReplicationPeers {
    * @param peerId a short that identifies the cluster
    * @return the configuration for the peer cluster, null if it was unable to get the configuration
    */
-  Configuration getPeerConf(String peerId) throws KeeperException;
+  Configuration getPeerConf(String peerId) throws ReplicationException;
 }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Thu Aug 22 05:42:14 2013
@@ -80,16 +80,21 @@ public class ReplicationPeersZKImpl exte
   }
 
   @Override
-  public void init() throws IOException, KeeperException {
-    ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+  public void init() throws ReplicationException {
+    try {
+      ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not initialize replication peers", e);
+    }
     connectExistingPeers();
   }
 
   @Override
-  public void addPeer(String id, String clusterKey) throws IOException {
+  public void addPeer(String id, String clusterKey) throws ReplicationException {
     try {
       if (peerExists(id)) {
-        throw new IllegalArgumentException("Cannot add existing peer");
+        throw new IllegalArgumentException("Cannot add a peer with id=" + id
+            + " because that id already exists.");
       }
       ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
       ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
@@ -101,30 +106,32 @@ public class ReplicationPeersZKImpl exte
         ENABLED_ZNODE_BYTES);
       // A peer is enabled by default
     } catch (KeeperException e) {
-      throw new IOException("Unable to add peer", e);
+      throw new ReplicationException("Could not add peer with id=" + id
+          + ", clusterKey=" + clusterKey, e);
     }
   }
 
   @Override
-  public void removePeer(String id) throws IOException {
+  public void removePeer(String id) throws ReplicationException {
     try {
       if (!peerExists(id)) {
-        throw new IllegalArgumentException("Cannot remove inexisting peer");
+        throw new IllegalArgumentException("Cannot remove peer with id=" + id
+            + " because that id does not exist.");
       }
       ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
     } catch (KeeperException e) {
-      throw new IOException("Unable to remove a peer", e);
+      throw new ReplicationException("Could not remove peer with id=" + id, e);
     }
   }
 
   @Override
-  public void enablePeer(String id) throws IOException {
+  public void enablePeer(String id) throws ReplicationException {
     changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
     LOG.info("peer " + id + " is enabled");
   }
 
   @Override
-  public void disablePeer(String id) throws IOException {
+  public void disablePeer(String id) throws ReplicationException {
     changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
     LOG.info("peer " + id + " is disabled");
   }
@@ -132,13 +139,13 @@ public class ReplicationPeersZKImpl exte
   @Override
   public boolean getStatusOfConnectedPeer(String id) {
     if (!this.peerClusters.containsKey(id)) {
-      throw new IllegalArgumentException("peer " + id + " is not connected");
+      throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
     }
     return this.peerClusters.get(id).getPeerEnabled().get();
   }
 
   @Override
-  public boolean getStatusOfPeerFromBackingStore(String id) throws IOException {
+  public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
     if (!this.getAllPeerIds().contains(id)) {
       throw new IllegalArgumentException("peer " + id + " doesn't exist");
     }
@@ -146,21 +153,26 @@ public class ReplicationPeersZKImpl exte
     try {
       return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
     } catch (KeeperException e) {
-      throw new IOException(e);
+      throw new ReplicationException(e);
     } catch (DeserializationException e) {
-      throw new IOException(e);
+      throw new ReplicationException(e);
     }
   }
 
   @Override
-  public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+  public boolean connectToPeer(String peerId) throws ReplicationException {
     if (peerClusters == null) {
       return false;
     }
     if (this.peerClusters.containsKey(peerId)) {
       return false;
     }
-    ReplicationPeer peer = getPeer(peerId);
+    ReplicationPeer peer = null;
+    try {
+      peer = getPeer(peerId);
+    } catch (Exception e) {
+      throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
+    }
     if (peer == null) {
       return false;
     }
@@ -242,9 +254,15 @@ public class ReplicationPeersZKImpl exte
   }
 
   @Override
-  public Configuration getPeerConf(String peerId) throws KeeperException {
+  public Configuration getPeerConf(String peerId) throws ReplicationException {
     String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
-    byte[] data = ZKUtil.getData(this.zookeeper, znode);
+    byte[] data = null;
+    try {
+      data = ZKUtil.getData(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error getting configuration for peer with id="
+          + peerId, e);
+    }
     if (data == null) {
       LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
       return null;
@@ -293,11 +311,14 @@ public class ReplicationPeersZKImpl exte
   /**
    * A private method used during initialization. This method attempts to connect to all registered
    * peer clusters. This method does not set a watch on the peer cluster znodes.
-   * @throws IOException
-   * @throws KeeperException
    */
-  private void connectExistingPeers() throws IOException, KeeperException {
-    List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+  private void connectExistingPeers() throws ReplicationException {
+    List<String> znodes = null;
+    try {
+      znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error getting the list of peer clusters.", e);
+    }
     if (znodes != null) {
       for (String z : znodes) {
         connectToPeer(z);
@@ -349,13 +370,13 @@ public class ReplicationPeersZKImpl exte
    * Update the state znode of a peer cluster.
    * @param id
    * @param state
-   * @throws IOException
    */
   private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
-      throws IOException {
+      throws ReplicationException {
     try {
       if (!peerExists(id)) {
-        throw new IllegalArgumentException("peer " + id + " is not registered");
+        throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
+            + " does not exist.");
       }
       String peerStateZNode = getPeerStateNode(id);
       byte[] stateBytes =
@@ -366,9 +387,9 @@ public class ReplicationPeersZKImpl exte
       } else {
         ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
       }
-      LOG.info("state of the peer " + id + " changed to " + state.name());
+      LOG.info("Peer with id= " + id + " is now " + state.name());
     } catch (KeeperException e) {
-      throw new IOException("Unable to change state of the peer " + id, e);
+      throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
     }
   }
 
@@ -376,10 +397,9 @@ public class ReplicationPeersZKImpl exte
    * Helper method to connect to a peer
    * @param peerId peer's identifier
    * @return object representing the peer
-   * @throws IOException
-   * @throws KeeperException
+   * @throws ReplicationException
    */
-  private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException {
+  private ReplicationPeer getPeer(String peerId) throws ReplicationException {
     Configuration peerConf = getPeerConf(peerId);
     if (peerConf == null) {
       return null;
@@ -391,7 +411,12 @@ public class ReplicationPeersZKImpl exte
 
     ReplicationPeer peer =
         new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
-    peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+    try {
+      peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+    } catch (KeeperException e) {
+      throw new ReplicationException("Error starting the peer state tracker for peerId=" + peerId,
+          e);
+    }
     peer.getZkw().registerListener(new PeerRegionServerListener(peer));
     return peer;
   }

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java Thu Aug 22 05:42:14 2013
@@ -23,7 +23,6 @@ import java.util.SortedMap;
 import java.util.SortedSet;
 
 import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for maintaining a region server's replication queues. These queues
@@ -37,7 +36,7 @@ public interface ReplicationQueues {
    * @param serverName The server name of the region server that owns the replication queues this
    *          interface manages.
    */
-  void init(String serverName) throws KeeperException;
+  void init(String serverName) throws ReplicationException;
 
   /**
    * Remove a replication queue.
@@ -49,9 +48,8 @@ public interface ReplicationQueues {
    * Add a new HLog file to the given queue. If the queue does not exist it is created.
    * @param queueId a String that identifies the queue.
    * @param filename name of the HLog
-   * @throws KeeperException
    */
-  void addLog(String queueId, String filename) throws KeeperException;
+  void addLog(String queueId, String filename) throws ReplicationException;
 
   /**
    * Remove an HLog file from the given queue.
@@ -74,7 +72,7 @@ public interface ReplicationQueues {
    * @param filename name of the HLog
    * @return the current position in the file
    */
-  long getLogPosition(String queueId, String filename) throws KeeperException;
+  long getLogPosition(String queueId, String filename) throws ReplicationException;
 
   /**
    * Remove all replication queues for this region server.

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java Thu Aug 22 05:42:14 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replicat
 
 import java.util.List;
 
-import org.apache.zookeeper.KeeperException;
 
 /**
  * This provides an interface for clients of replication to view replication queues. These queues
@@ -31,7 +30,7 @@ public interface ReplicationQueuesClient
   /**
    * Initialize the replication queue client interface.
    */
-  public void init() throws KeeperException;
+  public void init() throws ReplicationException;
 
   /**
    * Get a list of all region servers that have outstanding replication queues. These servers could

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java Thu Aug 22 05:42:14 2013
@@ -35,8 +35,12 @@ public class ReplicationQueuesClientZKIm
   }
 
   @Override
-  public void init() throws KeeperException {
-    ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+  public void init() throws ReplicationException {
+    try {
+      ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Internal error while initializing a queues client", e);
+    }
   }
 
   @Override

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java Thu Aug 22 05:42:14 2013
@@ -44,20 +44,20 @@ import org.apache.zookeeper.KeeperExcept
  * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
  * all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is
  * the regionserver name (a concatenation of the region server’s hostname, client port and start
- * code). For example: 
- * 
- * /hbase/replication/rs/hostname.example.org,6020,1234 
- * 
+ * code). For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ *
  * Within this znode, the region server maintains a set of HLog replication queues. These queues are
  * represented by child znodes named using there give queue id. For example:
- * 
+ *
  * /hbase/replication/rs/hostname.example.org,6020,1234/1
  * /hbase/replication/rs/hostname.example.org,6020,1234/2
  *
  * Each queue has one child znode for every HLog that still needs to be replicated. The value of
  * these HLog child znodes is the latest position that has been replicated. This position is updated
  * every time a HLog entry is replicated. For example:
- * 
+ *
  * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
  */
 public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
@@ -75,9 +75,13 @@ public class ReplicationQueuesZKImpl ext
   }
 
   @Override
-  public void init(String serverName) throws KeeperException {
+  public void init(String serverName) throws ReplicationException {
     this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
-    ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+    try {
+      ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Could not initialize replication queues.", e);
+    }
   }
 
   @Override
@@ -90,10 +94,16 @@ public class ReplicationQueuesZKImpl ext
   }
 
   @Override
-  public void addLog(String queueId, String filename) throws KeeperException {
+  public void addLog(String queueId, String filename) throws ReplicationException {
     String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
     znode = ZKUtil.joinZNode(znode, filename);
-    ZKUtil.createWithParents(this.zookeeper, znode);
+    try {
+      ZKUtil.createWithParents(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      throw new ReplicationException(
+          "Could not add log because znode could not be created. queueId=" + queueId
+              + ", filename=" + filename);
+    }
   }
 
   @Override
@@ -122,10 +132,16 @@ public class ReplicationQueuesZKImpl ext
   }
 
   @Override
-  public long getLogPosition(String queueId, String filename) throws KeeperException {
+  public long getLogPosition(String queueId, String filename) throws ReplicationException {
     String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
     String znode = ZKUtil.joinZNode(clusterZnode, filename);
-    byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
+    byte[] bytes = null;
+    try {
+      bytes = ZKUtil.getData(this.zookeeper, znode);
+    } catch (KeeperException e) {
+      throw new ReplicationException("Internal Error: could not get position in log for queueId="
+          + queueId + ", filename=" + filename, e);
+    }
     try {
       return ZKUtil.parseHLogPositionFrom(bytes);
     } catch (DeserializationException de) {

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Thu Aug 22 05:42:14 2013
@@ -36,15 +36,14 @@ import org.apache.hadoop.hbase.io.Immuta
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
 import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeer;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.zookeeper.KeeperException;
 
 /**
  * This map-only job compares the data from a local table with a remote one.
@@ -128,8 +127,9 @@ public class VerifyReplication {
               HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
               scan.setStartRow(value.getRow());
               replicatedScanner = replicatedTable.getScanner(scan);
-            } catch (KeeperException e) {
-              throw new IOException("Got a ZK exception", e);
+            } catch (ReplicationException e) {
+              throw new IOException(
+                  "An error occured while trying to connect to the remove peer cluster", e);
             } finally {
               if (peer != null) {
                 peer.close();

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Thu Aug 22 05:42:14 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Abortable
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
@@ -125,7 +126,7 @@ public class ReplicationLogCleaner exten
       this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
       this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
       this.replicationQueues.init();
-    } catch (KeeperException e) {
+    } catch (ReplicationException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     } catch (IOException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Thu Aug 22 05:42:14 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionser
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationFactory;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -119,8 +120,8 @@ public class Replication implements WALA
         this.replicationTracker =
             ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
               this.conf, this.server, this.server);
-      } catch (KeeperException ke) {
-        throw new IOException("Failed replication handler create", ke);
+      } catch (ReplicationException e) {
+        throw new IOException("Failed replication handler create", e);
       }
       UUID clusterId = null;
       try {
@@ -197,7 +198,11 @@ public class Replication implements WALA
    */
   public void startReplicationService() throws IOException {
     if (this.replication) {
-      this.replicationManager.init();
+      try {
+        this.replicationManager.init();
+      } catch (ReplicationException e) {
+        throw new IOException(e);
+      }
       this.replicationSink = new ReplicationSink(this.conf, this.server);
       this.scheduleThreadPool.scheduleAtFixedRate(
         new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Aug 22 05:42:14 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.
 import org.apache.hadoop.hbase.regionserver.wal.HLog;
 import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -234,7 +235,7 @@ public class ReplicationSource extends T
           LOG.trace("Recovered queue started with log " + this.queue.peek() +
               " at position " + this.repLogReader.getPosition());
         }
-      } catch (KeeperException e) {
+      } catch (ReplicationException e) {
         this.terminate("Couldn't get the position of this recovered queue " +
             this.peerClusterZnode, e);
       }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Aug 22 05:42:14 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationException;
 import org.apache.hadoop.hbase.replication.ReplicationListener;
 import org.apache.hadoop.hbase.replication.ReplicationPeers;
 import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -190,7 +191,7 @@ public class ReplicationSourceManager im
    * Adds a normal source per registered peer cluster and tries to process all
    * old region server hlog queues
    */
-  public void init() throws IOException {
+  protected void init() throws IOException, ReplicationException {
     for (String id : this.replicationPeers.getConnectedPeers()) {
       addSource(id);
     }
@@ -216,7 +217,8 @@ public class ReplicationSourceManager im
    * @return the source that was created
    * @throws IOException
    */
-  public ReplicationSourceInterface addSource(String id) throws IOException {
+  protected ReplicationSourceInterface addSource(String id) throws IOException,
+      ReplicationException {
     ReplicationSourceInterface src =
         getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
           this.replicationPeers, stopper, id, this.clusterId);
@@ -229,11 +231,12 @@ public class ReplicationSourceManager im
         this.hlogsById.get(id).add(name);
         try {
           this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
-        } catch (KeeperException ke) {
-          String message = "Cannot add log to zk for" +
-            " replication when creating a new source";
+        } catch (ReplicationException e) {
+          String message =
+              "Cannot add log to queue when creating a new source, queueId="
+                  + src.getPeerClusterZnode() + ", filename=" + name;
           stopper.stop(message);
-          throw new IOException(message, ke);
+          throw e;
         }
         src.enqueueLog(this.latestPath);
       }
@@ -289,8 +292,9 @@ public class ReplicationSourceManager im
       for (ReplicationSourceInterface source : this.sources) {
         try {
           this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
-        } catch (KeeperException ke) {
-          throw new IOException("Cannot add log to zk for replication", ke);
+        } catch (ReplicationException e) {
+          throw new IOException("Cannot add log to replication queue with id="
+              + source.getPeerClusterZnode() + ", filename=" + name, e);
         }
       }
       for (SortedSet<String> hlogs : this.hlogsById.values()) {
@@ -323,7 +327,7 @@ public class ReplicationSourceManager im
    * @return the created source
    * @throws IOException
    */
-  public ReplicationSourceInterface getReplicationSource(final Configuration conf,
+  protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
       final FileSystem fs, final ReplicationSourceManager manager,
       final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
       final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
@@ -431,10 +435,7 @@ public class ReplicationSourceManager im
         if (added) {
           addSource(id);
         }
-      } catch (IOException e) {
-        // TODO manage better than that ?
-        LOG.error("Error while adding a new peer", e);
-      } catch (KeeperException e) {
+      } catch (Exception e) {
         LOG.error("Error while adding a new peer", e);
       }
     }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java Thu Aug 22 05:42:14 2013
@@ -68,7 +68,7 @@ public abstract class TestReplicationSta
   }
 
   @Test
-  public void testReplicationQueuesClient() throws KeeperException {
+  public void testReplicationQueuesClient() throws ReplicationException {
     rqc.init();
     // Test methods with empty state
     assertEquals(0, rqc.getListOfReplicators().size());
@@ -109,7 +109,7 @@ public abstract class TestReplicationSta
   }
 
   @Test
-  public void testReplicationQueues() throws KeeperException, IOException {
+  public void testReplicationQueues() throws ReplicationException {
     rq1.init(server1);
     rq2.init(server2);
     rq3.init(server3);
@@ -259,7 +259,7 @@ public abstract class TestReplicationSta
    * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
    * 3, 4, 5 log files respectively
    */
-  protected void populateQueues() throws KeeperException, IOException {
+  protected void populateQueues() throws ReplicationException {
     rq1.addLog("trash", "trash");
     rq1.removeQueue("trash");