You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/08/22 07:42:14 UTC
svn commit: r1516367 - in /hbase/trunk:
hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/
hbase-client/src/main/java/org/apache/hadoop/hbase/replication/
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/
hba...
Author: stack
Date: Thu Aug 22 05:42:14 2013
New Revision: 1516367
URL: http://svn.apache.org/r1516367
Log:
HBASE-8441 [replication] Refactor KeeperExceptions thrown from replication state interfaces into replication specific exceptions
Added:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java
Modified:
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Thu Aug 22 05:42:14 2013
@@ -25,6 +25,7 @@ import org.apache.hadoop.hbase.Abortable
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
@@ -83,7 +84,7 @@ public class ReplicationAdmin implements
/**
* Constructor that creates a connection to the local ZooKeeper ensemble.
* @param conf Configuration to use
- * @throws IOException if the connection to ZK cannot be made
+ * @throws IOException if an internal replication error occurs
* @throws RuntimeException if replication isn't enabled.
*/
public ReplicationAdmin(Configuration conf) throws IOException {
@@ -100,8 +101,8 @@ public class ReplicationAdmin implements
ReplicationFactory.getReplicationQueuesClient(zkw, conf, this.connection);
this.replicationQueuesClient.init();
- } catch (KeeperException e) {
- throw new IOException("Unable setup the ZooKeeper connection", e);
+ } catch (ReplicationException e) {
+ throw new IOException("Error initializing the replication admin client.", e);
}
}
@@ -131,7 +132,7 @@ public class ReplicationAdmin implements
* @throws IllegalStateException if there's already one slave since
* multi-slave isn't supported yet.
*/
- public void addPeer(String id, String clusterKey) throws IOException {
+ public void addPeer(String id, String clusterKey) throws ReplicationException {
this.replicationPeers.addPeer(id, clusterKey);
}
@@ -139,7 +140,7 @@ public class ReplicationAdmin implements
* Removes a peer cluster and stops the replication to it.
* @param id a short that identifies the cluster
*/
- public void removePeer(String id) throws IOException {
+ public void removePeer(String id) throws ReplicationException {
this.replicationPeers.removePeer(id);
}
@@ -147,7 +148,7 @@ public class ReplicationAdmin implements
* Restart the replication stream to the specified peer.
* @param id a short that identifies the cluster
*/
- public void enablePeer(String id) throws IOException {
+ public void enablePeer(String id) throws ReplicationException {
this.replicationPeers.enablePeer(id);
}
@@ -155,7 +156,7 @@ public class ReplicationAdmin implements
* Stop the replication stream to the specified peer.
* @param id a short that identifies the cluster
*/
- public void disablePeer(String id) throws IOException {
+ public void disablePeer(String id) throws ReplicationException {
this.replicationPeers.disablePeer(id);
}
@@ -181,7 +182,7 @@ public class ReplicationAdmin implements
* is thrown if it doesn't exist
* @return true if replication is enabled to that peer, false if it isn't
*/
- public boolean getPeerState(String id) throws IOException {
+ public boolean getPeerState(String id) throws ReplicationException {
return this.replicationPeers.getStatusOfPeerFromBackingStore(id);
}
Added: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java?rev=1516367&view=auto
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java (added)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationException.java Thu Aug 22 05:42:14 2013
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.hbase.exceptions.HBaseException;
+
+/**
+ * An HBase Replication exception. This exception is client facing and is thrown from all
+ * replication interfaces that deal with the manipulation of replication state. This exception could
+ * be thrown for a number of different reasons including loss of connection to the underlying data
+ * store, loss of connection to a peer cluster or errors during deserialization of replication data.
+ */
+@InterfaceAudience.Public
+public class ReplicationException extends HBaseException {
+
+ private static final long serialVersionUID = -8885598603988198062L;
+
+ public ReplicationException() {
+ super();
+ }
+
+ public ReplicationException(final String message) {
+ super(message);
+ }
+
+ public ReplicationException(final String message, final Throwable t) {
+ super(message, t);
+ }
+
+ public ReplicationException(final Throwable t) {
+ super(t);
+ }
+}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Thu Aug 22 05:42:14 2013
@@ -21,7 +21,6 @@ package org.apache.hadoop.hbase.replicat
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
-import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -70,12 +69,15 @@ public class ReplicationPeer implements
* @param key cluster key used to locate the peer
* @param id string representation of this peer's identifier
*/
- public ReplicationPeer(Configuration conf, String key,
- String id) throws IOException {
+ public ReplicationPeer(Configuration conf, String key, String id) throws ReplicationException {
this.conf = conf;
this.clusterKey = key;
this.id = id;
- this.reloadZkWatcher();
+ try {
+ this.reloadZkWatcher();
+ } catch (IOException e) {
+ throw new ReplicationException("Error connecting to peer cluster with peerId=" + id, e);
+ }
}
/**
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java Thu Aug 22 05:42:14 2013
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.replication;
-import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -27,7 +26,6 @@ import java.util.UUID;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for maintaining a set of peer clusters. These peers are remote slave
@@ -44,9 +42,8 @@ public interface ReplicationPeers {
/**
* Initialize the ReplicationPeers interface.
- * @throws KeeperException
*/
- void init() throws IOException, KeeperException;
+ void init() throws ReplicationException;
/**
* Add a new remote slave cluster for replication.
@@ -54,25 +51,25 @@ public interface ReplicationPeers {
* @param clusterKey the concatenation of the slave cluster's:
* hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
*/
- void addPeer(String peerId, String clusterKey) throws IOException;
+ void addPeer(String peerId, String clusterKey) throws ReplicationException;
/**
* Removes a remote slave cluster and stops the replication to it.
* @param peerId a short that identifies the cluster
*/
- void removePeer(String peerId) throws IOException;
+ void removePeer(String peerId) throws ReplicationException;
/**
* Restart the replication to the specified remote slave cluster.
* @param peerId a short that identifies the cluster
*/
- void enablePeer(String peerId) throws IOException;
+ void enablePeer(String peerId) throws ReplicationException;
/**
* Stop the replication to the specified remote slave cluster.
* @param peerId a short that identifies the cluster
*/
- void disablePeer(String peerId) throws IOException;
+ void disablePeer(String peerId) throws ReplicationException;
/**
* Get the replication status for the specified connected remote slave cluster.
@@ -91,7 +88,7 @@ public interface ReplicationPeers {
* @return true if replication is enabled, false otherwise.
* @throws IOException Throws if there's an error contacting the store
*/
- boolean getStatusOfPeerFromBackingStore(String peerId) throws IOException;
+ boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
/**
* Get a set of all connected remote slave clusters.
@@ -118,7 +115,7 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @return true if a new connection was made, false if no new connection was made.
*/
- boolean connectToPeer(String peerId) throws IOException, KeeperException;
+ boolean connectToPeer(String peerId) throws ReplicationException;
/**
* Disconnect from a remote slave cluster.
@@ -154,5 +151,5 @@ public interface ReplicationPeers {
* @param peerId a short that identifies the cluster
* @return the configuration for the peer cluster, null if it was unable to get the configuration
*/
- Configuration getPeerConf(String peerId) throws KeeperException;
+ Configuration getPeerConf(String peerId) throws ReplicationException;
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Thu Aug 22 05:42:14 2013
@@ -80,16 +80,21 @@ public class ReplicationPeersZKImpl exte
}
@Override
- public void init() throws IOException, KeeperException {
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ public void init() throws ReplicationException {
+ try {
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not initialize replication peers", e);
+ }
connectExistingPeers();
}
@Override
- public void addPeer(String id, String clusterKey) throws IOException {
+ public void addPeer(String id, String clusterKey) throws ReplicationException {
try {
if (peerExists(id)) {
- throw new IllegalArgumentException("Cannot add existing peer");
+ throw new IllegalArgumentException("Cannot add a peer with id=" + id
+ + " because that id already exists.");
}
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
@@ -101,30 +106,32 @@ public class ReplicationPeersZKImpl exte
ENABLED_ZNODE_BYTES);
// A peer is enabled by default
} catch (KeeperException e) {
- throw new IOException("Unable to add peer", e);
+ throw new ReplicationException("Could not add peer with id=" + id
+ + ", clusterKey=" + clusterKey, e);
}
}
@Override
- public void removePeer(String id) throws IOException {
+ public void removePeer(String id) throws ReplicationException {
try {
if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot remove inexisting peer");
+ throw new IllegalArgumentException("Cannot remove peer with id=" + id
+ + " because that id does not exist.");
}
ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
- throw new IOException("Unable to remove a peer", e);
+ throw new ReplicationException("Could not remove peer with id=" + id, e);
}
}
@Override
- public void enablePeer(String id) throws IOException {
+ public void enablePeer(String id) throws ReplicationException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
LOG.info("peer " + id + " is enabled");
}
@Override
- public void disablePeer(String id) throws IOException {
+ public void disablePeer(String id) throws ReplicationException {
changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
LOG.info("peer " + id + " is disabled");
}
@@ -132,13 +139,13 @@ public class ReplicationPeersZKImpl exte
@Override
public boolean getStatusOfConnectedPeer(String id) {
if (!this.peerClusters.containsKey(id)) {
- throw new IllegalArgumentException("peer " + id + " is not connected");
+ throw new IllegalArgumentException("Peer with id= " + id + " is not connected");
}
return this.peerClusters.get(id).getPeerEnabled().get();
}
@Override
- public boolean getStatusOfPeerFromBackingStore(String id) throws IOException {
+ public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
if (!this.getAllPeerIds().contains(id)) {
throw new IllegalArgumentException("peer " + id + " doesn't exist");
}
@@ -146,21 +153,26 @@ public class ReplicationPeersZKImpl exte
try {
return ReplicationPeer.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
} catch (KeeperException e) {
- throw new IOException(e);
+ throw new ReplicationException(e);
} catch (DeserializationException e) {
- throw new IOException(e);
+ throw new ReplicationException(e);
}
}
@Override
- public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+ public boolean connectToPeer(String peerId) throws ReplicationException {
if (peerClusters == null) {
return false;
}
if (this.peerClusters.containsKey(peerId)) {
return false;
}
- ReplicationPeer peer = getPeer(peerId);
+ ReplicationPeer peer = null;
+ try {
+ peer = getPeer(peerId);
+ } catch (Exception e) {
+ throw new ReplicationException("Error connecting to peer with id=" + peerId, e);
+ }
if (peer == null) {
return false;
}
@@ -242,9 +254,15 @@ public class ReplicationPeersZKImpl exte
}
@Override
- public Configuration getPeerConf(String peerId) throws KeeperException {
+ public Configuration getPeerConf(String peerId) throws ReplicationException {
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
- byte[] data = ZKUtil.getData(this.zookeeper, znode);
+ byte[] data = null;
+ try {
+ data = ZKUtil.getData(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error getting configuration for peer with id="
+ + peerId, e);
+ }
if (data == null) {
LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
return null;
@@ -293,11 +311,14 @@ public class ReplicationPeersZKImpl exte
/**
* A private method used during initialization. This method attempts to connect to all registered
* peer clusters. This method does not set a watch on the peer cluster znodes.
- * @throws IOException
- * @throws KeeperException
*/
- private void connectExistingPeers() throws IOException, KeeperException {
- List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ private void connectExistingPeers() throws ReplicationException {
+ List<String> znodes = null;
+ try {
+ znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error getting the list of peer clusters.", e);
+ }
if (znodes != null) {
for (String z : znodes) {
connectToPeer(z);
@@ -349,13 +370,13 @@ public class ReplicationPeersZKImpl exte
* Update the state znode of a peer cluster.
* @param id
* @param state
- * @throws IOException
*/
private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
- throws IOException {
+ throws ReplicationException {
try {
if (!peerExists(id)) {
- throw new IllegalArgumentException("peer " + id + " is not registered");
+ throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
+ + " does not exist.");
}
String peerStateZNode = getPeerStateNode(id);
byte[] stateBytes =
@@ -366,9 +387,9 @@ public class ReplicationPeersZKImpl exte
} else {
ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
}
- LOG.info("state of the peer " + id + " changed to " + state.name());
+ LOG.info("Peer with id= " + id + " is now " + state.name());
} catch (KeeperException e) {
- throw new IOException("Unable to change state of the peer " + id, e);
+ throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
}
}
@@ -376,10 +397,9 @@ public class ReplicationPeersZKImpl exte
* Helper method to connect to a peer
* @param peerId peer's identifier
* @return object representing the peer
- * @throws IOException
- * @throws KeeperException
+ * @throws ReplicationException
*/
- private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException {
+ private ReplicationPeer getPeer(String peerId) throws ReplicationException {
Configuration peerConf = getPeerConf(peerId);
if (peerConf == null) {
return null;
@@ -391,7 +411,12 @@ public class ReplicationPeersZKImpl exte
ReplicationPeer peer =
new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
- peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+ try {
+ peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+ } catch (KeeperException e) {
+ throw new ReplicationException("Error starting the peer state tracker for peerId=" + peerId,
+ e);
+ }
peer.getZkw().registerListener(new PeerRegionServerListener(peer));
return peer;
}
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java Thu Aug 22 05:42:14 2013
@@ -23,7 +23,6 @@ import java.util.SortedMap;
import java.util.SortedSet;
import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
@@ -37,7 +36,7 @@ public interface ReplicationQueues {
* @param serverName The server name of the region server that owns the replication queues this
* interface manages.
*/
- void init(String serverName) throws KeeperException;
+ void init(String serverName) throws ReplicationException;
/**
* Remove a replication queue.
@@ -49,9 +48,8 @@ public interface ReplicationQueues {
* Add a new HLog file to the given queue. If the queue does not exist it is created.
* @param queueId a String that identifies the queue.
* @param filename name of the HLog
- * @throws KeeperException
*/
- void addLog(String queueId, String filename) throws KeeperException;
+ void addLog(String queueId, String filename) throws ReplicationException;
/**
* Remove an HLog file from the given queue.
@@ -74,7 +72,7 @@ public interface ReplicationQueues {
* @param filename name of the HLog
* @return the current position in the file
*/
- long getLogPosition(String queueId, String filename) throws KeeperException;
+ long getLogPosition(String queueId, String filename) throws ReplicationException;
/**
* Remove all replication queues for this region server.
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java Thu Aug 22 05:42:14 2013
@@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.replicat
import java.util.List;
-import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for clients of replication to view replication queues. These queues
@@ -31,7 +30,7 @@ public interface ReplicationQueuesClient
/**
* Initialize the replication queue client interface.
*/
- public void init() throws KeeperException;
+ public void init() throws ReplicationException;
/**
* Get a list of all region servers that have outstanding replication queues. These servers could
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientZKImpl.java Thu Aug 22 05:42:14 2013
@@ -35,8 +35,12 @@ public class ReplicationQueuesClientZKIm
}
@Override
- public void init() throws KeeperException {
- ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+ public void init() throws ReplicationException {
+ try {
+ ZKUtil.createWithParents(this.zookeeper, this.queuesZNode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Internal error while initializing a queues client", e);
+ }
}
@Override
Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java Thu Aug 22 05:42:14 2013
@@ -44,20 +44,20 @@ import org.apache.zookeeper.KeeperExcept
* base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
* all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is
* the regionserver name (a concatenation of the region serverâs hostname, client port and start
- * code). For example:
- *
- * /hbase/replication/rs/hostname.example.org,6020,1234
- *
+ * code). For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ *
* Within this znode, the region server maintains a set of HLog replication queues. These queues are
* represented by child znodes named using there give queue id. For example:
- *
+ *
* /hbase/replication/rs/hostname.example.org,6020,1234/1
* /hbase/replication/rs/hostname.example.org,6020,1234/2
*
* Each queue has one child znode for every HLog that still needs to be replicated. The value of
* these HLog child znodes is the latest position that has been replicated. This position is updated
* every time a HLog entry is replicated. For example:
- *
+ *
* /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
*/
public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
@@ -75,9 +75,13 @@ public class ReplicationQueuesZKImpl ext
}
@Override
- public void init(String serverName) throws KeeperException {
+ public void init(String serverName) throws ReplicationException {
this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
- ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+ try {
+ ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Could not initialize replication queues.", e);
+ }
}
@Override
@@ -90,10 +94,16 @@ public class ReplicationQueuesZKImpl ext
}
@Override
- public void addLog(String queueId, String filename) throws KeeperException {
+ public void addLog(String queueId, String filename) throws ReplicationException {
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
- ZKUtil.createWithParents(this.zookeeper, znode);
+ try {
+ ZKUtil.createWithParents(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ throw new ReplicationException(
+ "Could not add log because znode could not be created. queueId=" + queueId
+ + ", filename=" + filename);
+ }
}
@Override
@@ -122,10 +132,16 @@ public class ReplicationQueuesZKImpl ext
}
@Override
- public long getLogPosition(String queueId, String filename) throws KeeperException {
+ public long getLogPosition(String queueId, String filename) throws ReplicationException {
String clusterZnode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
String znode = ZKUtil.joinZNode(clusterZnode, filename);
- byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
+ byte[] bytes = null;
+ try {
+ bytes = ZKUtil.getData(this.zookeeper, znode);
+ } catch (KeeperException e) {
+ throw new ReplicationException("Internal Error: could not get position in log for queueId="
+ + queueId + ", filename=" + filename, e);
+ }
try {
return ZKUtil.parseHLogPositionFrom(bytes);
} catch (DeserializationException de) {
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Thu Aug 22 05:42:14 2013
@@ -36,15 +36,14 @@ import org.apache.hadoop.hbase.io.Immuta
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;
-import org.apache.zookeeper.KeeperException;
/**
* This map-only job compares the data from a local table with a remote one.
@@ -128,8 +127,9 @@ public class VerifyReplication {
HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
scan.setStartRow(value.getRow());
replicatedScanner = replicatedTable.getScanner(scan);
- } catch (KeeperException e) {
- throw new IOException("Got a ZK exception", e);
+ } catch (ReplicationException e) {
+ throw new IOException(
+ "An error occured while trying to connect to the remove peer cluster", e);
} finally {
if (peer != null) {
peer.close();
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Thu Aug 22 05:42:14 2013
@@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.Abortable
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
@@ -125,7 +126,7 @@ public class ReplicationLogCleaner exten
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = ReplicationFactory.getReplicationQueuesClient(zkw, conf, this);
this.replicationQueues.init();
- } catch (KeeperException e) {
+ } catch (ReplicationException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Thu Aug 22 05:42:14 2013
@@ -45,6 +45,7 @@ import org.apache.hadoop.hbase.regionser
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -119,8 +120,8 @@ public class Replication implements WALA
this.replicationTracker =
ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
this.conf, this.server, this.server);
- } catch (KeeperException ke) {
- throw new IOException("Failed replication handler create", ke);
+ } catch (ReplicationException e) {
+ throw new IOException("Failed replication handler create", e);
}
UUID clusterId = null;
try {
@@ -197,7 +198,11 @@ public class Replication implements WALA
*/
public void startReplicationService() throws IOException {
if (this.replication) {
- this.replicationManager.init();
+ try {
+ this.replicationManager.init();
+ } catch (ReplicationException e) {
+ throw new IOException(e);
+ }
this.replicationSink = new ReplicationSink(this.conf, this.server);
this.scheduleThreadPool.scheduleAtFixedRate(
new ReplicationStatisticsThread(this.replicationSink, this.replicationManager),
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Thu Aug 22 05:42:14 2013
@@ -50,6 +50,7 @@ import org.apache.hadoop.hbase.protobuf.
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -234,7 +235,7 @@ public class ReplicationSource extends T
LOG.trace("Recovered queue started with log " + this.queue.peek() +
" at position " + this.repLogReader.getPosition());
}
- } catch (KeeperException e) {
+ } catch (ReplicationException e) {
this.terminate("Couldn't get the position of this recovered queue " +
this.peerClusterZnode, e);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Thu Aug 22 05:42:14 2013
@@ -42,6 +42,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Stoppable;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationListener;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
@@ -190,7 +191,7 @@ public class ReplicationSourceManager im
* Adds a normal source per registered peer cluster and tries to process all
* old region server hlog queues
*/
- public void init() throws IOException {
+ protected void init() throws IOException, ReplicationException {
for (String id : this.replicationPeers.getConnectedPeers()) {
addSource(id);
}
@@ -216,7 +217,8 @@ public class ReplicationSourceManager im
* @return the source that was created
* @throws IOException
*/
- public ReplicationSourceInterface addSource(String id) throws IOException {
+ protected ReplicationSourceInterface addSource(String id) throws IOException,
+ ReplicationException {
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, this.replicationQueues,
this.replicationPeers, stopper, id, this.clusterId);
@@ -229,11 +231,12 @@ public class ReplicationSourceManager im
this.hlogsById.get(id).add(name);
try {
this.replicationQueues.addLog(src.getPeerClusterZnode(), name);
- } catch (KeeperException ke) {
- String message = "Cannot add log to zk for" +
- " replication when creating a new source";
+ } catch (ReplicationException e) {
+ String message =
+ "Cannot add log to queue when creating a new source, queueId="
+ + src.getPeerClusterZnode() + ", filename=" + name;
stopper.stop(message);
- throw new IOException(message, ke);
+ throw e;
}
src.enqueueLog(this.latestPath);
}
@@ -289,8 +292,9 @@ public class ReplicationSourceManager im
for (ReplicationSourceInterface source : this.sources) {
try {
this.replicationQueues.addLog(source.getPeerClusterZnode(), name);
- } catch (KeeperException ke) {
- throw new IOException("Cannot add log to zk for replication", ke);
+ } catch (ReplicationException e) {
+ throw new IOException("Cannot add log to replication queue with id="
+ + source.getPeerClusterZnode() + ", filename=" + name, e);
}
}
for (SortedSet<String> hlogs : this.hlogsById.values()) {
@@ -323,7 +327,7 @@ public class ReplicationSourceManager im
* @return the created source
* @throws IOException
*/
- public ReplicationSourceInterface getReplicationSource(final Configuration conf,
+ protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
final ReplicationQueues replicationQueues, final ReplicationPeers replicationPeers,
final Stoppable stopper, final String peerId, final UUID clusterId) throws IOException {
@@ -431,10 +435,7 @@ public class ReplicationSourceManager im
if (added) {
addSource(id);
}
- } catch (IOException e) {
- // TODO manage better than that ?
- LOG.error("Error while adding a new peer", e);
- } catch (KeeperException e) {
+ } catch (Exception e) {
LOG.error("Error while adding a new peer", e);
}
}
Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java?rev=1516367&r1=1516366&r2=1516367&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java Thu Aug 22 05:42:14 2013
@@ -68,7 +68,7 @@ public abstract class TestReplicationSta
}
@Test
- public void testReplicationQueuesClient() throws KeeperException {
+ public void testReplicationQueuesClient() throws ReplicationException {
rqc.init();
// Test methods with empty state
assertEquals(0, rqc.getListOfReplicators().size());
@@ -109,7 +109,7 @@ public abstract class TestReplicationSta
}
@Test
- public void testReplicationQueues() throws KeeperException, IOException {
+ public void testReplicationQueues() throws ReplicationException {
rq1.init(server1);
rq2.init(server2);
rq3.init(server3);
@@ -259,7 +259,7 @@ public abstract class TestReplicationSta
* three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
* 3, 4, 5 log files respectively
*/
- protected void populateQueues() throws KeeperException, IOException {
+ protected void populateQueues() throws ReplicationException {
rq1.addLog("trash", "trash");
rq1.removeQueue("trash");