You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/10/20 02:21:09 UTC
svn commit: r1024470 - in /hbase/trunk: ./ bin/replication/
src/main/java/org/apache/hadoop/hbase/client/replication/
src/main/java/org/apache/hadoop/hbase/replication/
src/main/java/org/apache/hadoop/hbase/replication/regionserver/
src/main/java/org/a...
Author: jdcryans
Date: Wed Oct 20 00:21:08 2010
New Revision: 1024470
URL: http://svn.apache.org/viewvc?rev=1024470&view=rev
Log:
HBASE-2201 JRuby shell for replication
Added:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
hbase/trunk/src/main/ruby/hbase/replication_admin.rb
hbase/trunk/src/main/ruby/shell/commands/add_peer.rb
hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb
hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb
hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb
hbase/trunk/src/main/ruby/shell/commands/start_replication.rb
hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/
hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
Removed:
hbase/trunk/bin/replication/add_peer.rb
Modified:
hbase/trunk/CHANGES.txt
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
hbase/trunk/src/main/ruby/hbase.rb
hbase/trunk/src/main/ruby/hbase/hbase.rb
hbase/trunk/src/main/ruby/shell.rb
hbase/trunk/src/main/ruby/shell/commands.rb
hbase/trunk/src/site/xdoc/replication.xml
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified: hbase/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/trunk/CHANGES.txt?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/CHANGES.txt (original)
+++ hbase/trunk/CHANGES.txt Wed Oct 20 00:21:08 2010
@@ -1072,6 +1072,7 @@ Release 0.21.0 - Unreleased
HBASE-3073 New APIs for Result, faster implementation for some calls
HBASE-3053 Add ability to have multiple Masters LocalHBaseCluster for
test writing
+ HBASE-2201 JRuby shell for replication
OPTIMIZATIONS
HBASE-410 [testing] Speed up the test suite
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Wed Oct 20 00:21:08 2010
@@ -0,0 +1,165 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.replication;
+
+import java.io.IOException;
+
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * <p>
+ * This class provides the administrative interface to HBase cluster
+ * replication. In order to use it, the cluster and the client using
+ * ReplicationAdmin must be configured with <code>hbase.replication</code>
+ * set to true.
+ * </p>
+ * <p>
+ * Adding a new peer results in creating new outbound connections from every
+ * region server to a subset of region servers on the slave cluster. Each
+ * new stream of replication will start replicating from the beginning of the
+ * current HLog, meaning that edits from that past will be replicated.
+ * </p>
+ * <p>
+ * Removing a peer is a destructive and irreversible operation that stops
+ * all the replication streams for the given cluster and deletes the metadata
+ * used to keep track of the replication state.
+ * </p>
+ * <p>
+ * Enabling and disabling peers is currently not supported.
+ * </p>
+ * <p>
+ * As cluster replication is still experimental, a kill switch is provided
+ * in order to stop all replication-related operations, see
+ * {@link #setReplicating(boolean)}. When setting it back to true, the new
+ * state of all the replication streams will be unknown and may have holes.
+ * Use at your own risk.
+ * </p>
+ * <p>
+ * To see which commands are available in the shell, type
+ * <code>replication</code>.
+ * </p>
+ */
+public class ReplicationAdmin {
+
+ private final ReplicationZookeeper replicationZk;
+
+ /**
+ * Constructor that creates a connection to the local ZooKeeper ensemble.
+ * @param conf Configuration to use
+ * @throws IOException if the connection to ZK cannot be made
+ * @throws RuntimeException if replication isn't enabled.
+ */
+ public ReplicationAdmin(Configuration conf) throws IOException {
+ if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
+ throw new RuntimeException("hbase.replication isn't true, please " +
+ "enable it in order to use replication");
+ }
+ ZooKeeperWatcher zkw = HConnectionManager.getConnection(conf).
+ getZooKeeperWatcher();
+ try {
+ this.replicationZk = new ReplicationZookeeper(conf, zkw);
+ } catch (KeeperException e) {
+ throw new IOException("Unable setup the ZooKeeper connection", e);
+ }
+ }
+
+ /**
+ * Add a new peer cluster to replicate to.
+ * @param id a short that identifies the cluster
+ * @param clusterKey the concatenation of the slave cluster's
+ * <code>hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent</code>
+ * @throws IllegalStateException if there's already one slave since
+ * multi-slave isn't supported yet.
+ */
+ public void addPeer(String id, String clusterKey) throws IOException {
+ this.replicationZk.addPeer(id, clusterKey);
+ }
+
+ /**
+ * Removes a peer cluster and stops the replication to it.
+ * @param id a short that identifies the cluster
+ */
+ public void removePeer(String id) throws IOException {
+ this.replicationZk.removePeer(id);
+ }
+
+ /**
+ * Restart the replication stream to the specified peer.
+ * @param id a short that identifies the cluster
+ */
+ public void enablePeer(String id) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ /**
+ * Stop the replication stream to the specified peer.
+ * @param id a short that identifies the cluster
+ */
+ public void disablePeer(String id) {
+ throw new NotImplementedException("Not implemented");
+ }
+
+ /**
+ * Get the number of slave clusters the local cluster has.
+ * @return number of slave clusters
+ */
+ public int getPeersCount() {
+ return this.replicationZk.listPeersIdsAndWatch().size();
+ }
+
+ /**
+ * Get the current status of the kill switch, if the cluster is replicating
+ * or not.
+ * @return true if the cluster is replicated, otherwise false
+ */
+ public boolean getReplicating() throws IOException {
+ try {
+ return this.replicationZk.getReplication();
+ } catch (KeeperException e) {
+ throw new IOException("Couldn't get the replication status");
+ }
+ }
+
+ /**
+ * Kill switch for all replication-related features
+ * @param newState true to start replication, false to stop it.
+ * completely
+ * @return the previous state
+ */
+ public boolean setReplicating(boolean newState) throws IOException {
+ boolean prev = getReplicating();
+ this.replicationZk.setReplicating(newState);
+ return prev;
+ }
+
+ /**
+ * Get the ZK-support tool created and used by this object for replication.
+ * @return the ZK-support tool
+ */
+ ReplicationZookeeper getReplicationZk() {
+ return replicationZk;
+ }
+}
Added: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (added)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Wed Oct 20 00:21:08 2010
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HServerAddress;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+/**
+ * This class acts as a wrapper for all the objects used to identify and
+ * communicate with remote peers. Everything needs to be created for objects
+ * of this class as it doesn't encapsulate any specific functionality e.g.
+ * it's a container class.
+ */
+public class ReplicationPeer {
+
+ private final String clusterKey;
+ private final String id;
+ private List<HServerAddress> regionServers =
+ new ArrayList<HServerAddress>(0);
+ private final AtomicBoolean peerEnabled = new AtomicBoolean();
+ // Cannot be final since a new object needs to be recreated when session fails
+ private ZooKeeperWatcher zkw;
+ private final Configuration conf;
+
+ /**
+ * Constructor that takes all the objects required to communicate with the
+ * specified peer, except for the region server addresses.
+ * @param conf configuration object to this peer
+ * @param key cluster key used to locate the peer
+ * @param id string representation of this peer's identifier
+ * @param zkw zookeeper connection to the peer
+ */
+ public ReplicationPeer(Configuration conf, String key,
+ String id, ZooKeeperWatcher zkw) {
+ this.conf = conf;
+ this.clusterKey = key;
+ this.id = id;
+ this.zkw = zkw;
+ }
+
+ /**
+ * Get the cluster key of that peer
+ * @return string consisting of zk ensemble addresses, client port
+ * and root znode
+ */
+ public String getClusterKey() {
+ return clusterKey;
+ }
+
+ /**
+ * Get the state of this peer
+ * @return atomic boolean that holds the status
+ */
+ public AtomicBoolean getPeerEnabled() {
+ return peerEnabled;
+ }
+
+ /**
+ * Get a list of all the addresses of all the region servers
+ * for this peer cluster
+ * @return list of addresses
+ */
+ public List<HServerAddress> getRegionServers() {
+ return regionServers;
+ }
+
+ /**
+ * Set the list of region servers for that peer
+ * @param regionServers list of addresses for the region servers
+ */
+ public void setRegionServers(List<HServerAddress> regionServers) {
+ this.regionServers = regionServers;
+ }
+
+ /**
+ * Get the ZK connection to this peer
+ * @return zk connection
+ */
+ public ZooKeeperWatcher getZkw() {
+ return zkw;
+ }
+
+ /**
+ * Get the identifier of this peer
+ * @return string representation of the id (short)
+ */
+ public String getId() {
+ return id;
+ }
+
+ /**
+ * Get the configuration object required to communicate with this peer
+ * @return configuration object
+ */
+ public Configuration getConfiguration() {
+ return conf;
+ }
+}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Wed Oct 20 00:21:08 2010
@@ -54,7 +54,6 @@ import org.apache.zookeeper.KeeperExcept
* <p/>
* <pre>
* replication/
- * master {contains a full cluster address}
* state {contains true or false}
* clusterId {contains a byte}
* peers/
@@ -80,8 +79,8 @@ public class ReplicationZookeeper {
private final static String RS_LOCK_ZNODE = "lock";
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
- // Map of addresses of peer clusters with their ZKW
- private Map<String, ZooKeeperWatcher> peerClusters;
+ // Map of peer clusters keyed by their id
+ private Map<String, ReplicationPeer> peerClusters;
// Path to the root replication znode
private String replicationZNode;
// Path to the peer clusters znode
@@ -92,16 +91,22 @@ public class ReplicationZookeeper {
private String rsServerNameZnode;
// Name node if the replicationState znode
private String replicationStateNodeName;
- // If this RS is part of a master cluster
- private boolean replicationMaster;
private final Configuration conf;
// Is this cluster replicating at the moment?
private AtomicBoolean replicating;
// Byte (stored as string here) that identifies this cluster
private String clusterId;
+ // The key to our own cluster
+ private String ourClusterKey;
// Abortable
private Abortable abortable;
+ /**
+ * Constructor used by clients of replication (like master and HBase clients)
+ * @param conf conf to use
+ * @param zk zk connection to use
+ * @throws IOException
+ */
public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk)
throws KeeperException {
@@ -125,22 +130,18 @@ public class ReplicationZookeeper {
this.conf = server.getConfiguration();
setZNodes();
- this.peerClusters = new HashMap<String, ZooKeeperWatcher>();
+ this.peerClusters = new HashMap<String, ReplicationPeer>();
this.replicating = replicating;
- setReplicating();
+ ZKUtil.createWithParents(this.zookeeper,
+ ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
+ readReplicationStateZnode();
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
// Set a tracker on replicationStateNodeNode
ReplicationStatusTracker tracker =
new ReplicationStatusTracker(this.zookeeper, server);
tracker.start();
-
- List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- if (znodes != null) {
- for (String z : znodes) {
- connectToPeer(z);
- }
- }
+ connectExistingPeers();
}
private void setZNodes() throws KeeperException {
@@ -156,27 +157,44 @@ public class ReplicationZookeeper {
conf.get("zookeeper.znode.replication.clusterId", "clusterId");
String rsZNodeName =
conf.get("zookeeper.znode.replication.rs", "rs");
- String thisCluster = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
+ this.ourClusterKey = this.conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
this.conf.get("hbase.zookeeper.property.clientPort") + ":" +
this.conf.get(HConstants.ZOOKEEPER_ZNODE_PARENT);
this.replicationZNode =
ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
+ ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
String znode = ZKUtil.joinZNode(this.replicationZNode, clusterIdZNodeName);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
String idResult = Bytes.toString(data);
this.clusterId = idResult == null?
Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
+ }
- znode = ZKUtil.joinZNode(this.replicationZNode, repMasterZNodeName);
- data = ZKUtil.getData(this.zookeeper, znode);
- String address = Bytes.toString(data);
- this.replicationMaster = thisCluster.equals(address);
- LOG.info("This cluster (" + thisCluster + ") is a " +
- (this.replicationMaster ? "master" : "slave") + " for replication" +
- ", compared with (" + address + ")");
+ private void connectExistingPeers() throws IOException, KeeperException {
+ List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ if (znodes != null) {
+ for (String z : znodes) {
+ connectToPeer(z);
+ }
+ }
+ }
+
+ /**
+ * List this cluster's peers' IDs
+ * @return list of all peers' identifiers
+ */
+ public List<String> listPeersIdsAndWatch() {
+ List<String> ids = null;
+ try {
+ ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ }
+ return ids;
}
/**
@@ -185,16 +203,32 @@ public class ReplicationZookeeper {
* @param peerClusterId (byte) the cluster to interrogate
* @return addresses of all region servers
*/
- public List<HServerAddress> getPeersAddresses(String peerClusterId)
+ public List<HServerAddress> getSlavesAddresses(String peerClusterId)
throws KeeperException {
if (this.peerClusters.size() == 0) {
return new ArrayList<HServerAddress>(0);
}
- ZooKeeperWatcher zkw = this.peerClusters.get(peerClusterId);
-
- return zkw == null?
- new ArrayList<HServerAddress>(0):
- ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
+ ReplicationPeer peer = this.peerClusters.get(peerClusterId);
+ if (peer == null) {
+ return new ArrayList<HServerAddress>(0);
+ }
+ peer.setRegionServers(fetchSlavesAddresses(peer.getZkw()));
+ return peer.getRegionServers();
+ }
+
+ /**
+ * Get the list of all the region servers from the specified peer
+ * @param zkw zk connection to use
+ * @return list of region server addresses
+ */
+ private List<HServerAddress> fetchSlavesAddresses(ZooKeeperWatcher zkw) {
+ List<HServerAddress> rss = null;
+ try {
+ rss = ZKUtil.listChildrenAndGetAsAddresses(zkw, zkw.rsZNode);
+ } catch (KeeperException e) {
+ LOG.warn("Cannot get peer's region server addresses", e);
+ }
+ return rss;
}
/**
@@ -203,44 +237,146 @@ public class ReplicationZookeeper {
* @param peerId id of the peer cluster
* @throws KeeperException
*/
- private void connectToPeer(String peerId) throws IOException, KeeperException {
+ public boolean connectToPeer(String peerId)
+ throws IOException, KeeperException {
+ if (peerClusters == null) {
+ return false;
+ }
+ if (this.peerClusters.containsKey(peerId)) {
+ return false;
+ // TODO remove when we support it
+ } else if (this.peerClusters.size() > 0) {
+ LOG.warn("Multiple slaves feature not supported");
+ return false;
+ }
+ ReplicationPeer peer = getPeer(peerId);
+ if (peer == null) {
+ return false;
+ }
+ this.peerClusters.put(peerId, peer);
+ ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
+ this.rsServerNameZnode, peerId));
+ LOG.info("Added new peer cluster " + peer.getClusterKey());
+ return true;
+ }
+
+ /**
+ * Helper method to connect to a peer
+ * @param peerId peer's identifier
+ * @return object representing the peer
+ * @throws IOException
+ * @throws KeeperException
+ */
+ private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
byte [] data = ZKUtil.getData(this.zookeeper, znode);
- String [] ensemble = Bytes.toString(data).split(":");
+ String otherClusterKey = Bytes.toString(data);
+ if (this.ourClusterKey.equals(otherClusterKey)) {
+ LOG.debug("Not connecting to " + peerId + " because it's us");
+ return null;
+ }
+ String[] ensemble = otherClusterKey.split(":");
if (ensemble.length != 3) {
- throw new IllegalArgumentException("Wrong format of cluster address: " +
+ LOG.warn("Wrong format of cluster address: " +
Bytes.toStringBinary(data));
+ return null;
}
+ // Construct the connection to the new peer
Configuration otherConf = new Configuration(this.conf);
otherConf.set(HConstants.ZOOKEEPER_QUORUM, ensemble[0]);
otherConf.set("hbase.zookeeper.property.clientPort", ensemble[1]);
otherConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, ensemble[2]);
ZooKeeperWatcher zkw = new ZooKeeperWatcher(otherConf,
"connection to cluster: " + peerId, this.abortable);
- this.peerClusters.put(peerId, zkw);
- ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
- this.rsServerNameZnode, peerId));
- LOG.info("Added new peer cluster " + StringUtils.arrayToString(ensemble));
+ return new ReplicationPeer(otherConf, peerId,
+ otherClusterKey, zkw);
}
/**
- * This reads the state znode for replication and sets the atomic boolean
+ * Set the new replication state for this cluster
+ * @param newState
+ */
+ public void setReplicating(boolean newState) throws IOException {
+ try {
+ ZKUtil.createWithParents(this.zookeeper,
+ ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
+ ZKUtil.setData(this.zookeeper,
+ ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
+ Bytes.toBytes(Boolean.toString(newState)));
+ } catch (KeeperException e) {
+ throw new IOException("Unable to set the replication state", e);
+ }
+ }
+
+ /**
+ * Remove the peer from zookeeper. which will trigger the watchers on every
+ * region server and close their sources
+ * @param id
+ * @throws IllegalArgumentException Thrown when the peer doesn't exist
*/
- private void setReplicating() {
+ public void removePeer(String id) throws IOException {
try {
- byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
- String value = Bytes.toString(data);
- if (value == null) LOG.info(getRepStateNode() + " data is null");
- else {
- this.replicating.set(Boolean.parseBoolean(value));
- LOG.info("Replication is now " + (this.replicating.get()?
- "started" : "stopped"));
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("Cannot remove inexisting peer");
}
+ ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ } catch (KeeperException e) {
+ throw new IOException("Unable to remove a peer", e);
+ }
+ }
+
+ /**
+ * Add a new peer to this cluster
+ * @param id peer's identifier
+ * @param clusterKey ZK ensemble's addresses, client port and root znode
+ * @throws IllegalArgumentException Thrown when the peer doesn't exist
+ * @throws IllegalStateException Thrown when a peer already exists, since
+ * multi-slave isn't supported yet.
+ */
+ public void addPeer(String id, String clusterKey) throws IOException {
+ try {
+ if (peerExists(id)) {
+ throw new IllegalArgumentException("Cannot add existing peer");
+ } else if (countPeers() > 0) {
+ throw new IllegalStateException("Multi-slave isn't supported yet");
+ }
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ ZKUtil.createAndWatch(this.zookeeper,
+ ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+ } catch (KeeperException e) {
+ throw new IOException("Unable to add peer", e);
+ }
+ }
+
+ private boolean peerExists(String id) throws KeeperException {
+ return ZKUtil.checkExists(this.zookeeper,
+ ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+ }
+
+ private int countPeers() throws KeeperException {
+ List<String> peers =
+ ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ return peers == null ? 0 : peers.size();
+ }
+
+ /**
+ * This reads the state znode for replication and sets the atomic boolean
+ */
+ private void readReplicationStateZnode() {
+ try {
+ this.replicating.set(getReplication());
+ LOG.info("Replication is now " + (this.replicating.get()?
+ "started" : "stopped"));
} catch (KeeperException e) {
this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
}
}
+ public boolean getReplication() throws KeeperException {
+ byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
+ return Boolean.parseBoolean(Bytes.toString(data));
+ }
+
private String getRepStateNode() {
return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
}
@@ -303,13 +439,8 @@ public class ReplicationZookeeper {
public List<String> getRegisteredRegionServers() {
List<String> result = null;
try {
- List<ZKUtil.NodeAndData> nads =
- ZKUtil.watchAndGetNewChildren(this.zookeeper, this.zookeeper.rsZNode);
- result = new ArrayList<String>(nads.size());
- for (ZKUtil.NodeAndData nad : nads) {
- String[] fullPath = nad.getNode().split("/");
- result.add(fullPath[fullPath.length - 1]);
- }
+ result = ZKUtil.listChildrenAndWatchThem(
+ this.zookeeper, this.zookeeper.rsZNode);
} catch (KeeperException e) {
this.abortable.abort("Get list of registered region servers", e);
}
@@ -442,10 +573,14 @@ public class ReplicationZookeeper {
* Delete a complete queue of hlogs
* @param peerZnode znode of the peer cluster queue of hlogs to delete
*/
- public void deleteSource(String peerZnode) {
+ public void deleteSource(String peerZnode, boolean closeConnection) {
try {
ZKUtil.deleteNodeRecursively(this.zookeeper,
ZKUtil.joinZNode(rsServerNameZnode, peerZnode));
+ if (closeConnection) {
+ this.peerClusters.get(peerZnode).getZkw().close();
+ this.peerClusters.remove(peerZnode);
+ }
} catch (KeeperException e) {
this.abortable.abort("Failed delete of " + peerZnode, e);
}
@@ -506,24 +641,39 @@ public class ReplicationZookeeper {
/**
* Get a map of all peer clusters
- * @return map of peer cluster, zk address to ZKW
+ * @return map of peer cluster keyed by id
*/
- public Map<String, ZooKeeperWatcher> getPeerClusters() {
+ public Map<String, ReplicationPeer> getPeerClusters() {
return this.peerClusters;
}
- public String getRSZNode() {
- return rsZNode;
+ /**
+ * Extracts the znode name of a peer cluster from a ZK path
+ * @param fullPath Path to extract the id from
+ * @return the id or an empty string if path is invalid
+ */
+ public static String getZNodeName(String fullPath) {
+ String[] parts = fullPath.split("/");
+ return parts.length > 0 ? parts[parts.length-1] : "";
}
/**
- *
- * @return
+ * Get this cluster's zk connection
+ * @return zk connection
*/
public ZooKeeperWatcher getZookeeperWatcher() {
return this.zookeeper;
}
+
+ /**
+ * Get the full path to the peers' znode
+ * @return path to peers in zk
+ */
+ public String getPeersZNode() {
+ return peersZNode;
+ }
+
/**
* Tracker for status of the replication
*/
@@ -536,7 +686,7 @@ public class ReplicationZookeeper {
@Override
public synchronized void nodeDataChanged(String path) {
super.nodeDataChanged(path);
- setReplicating();
+ readReplicationStateZnode();
}
}
}
\ No newline at end of file
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Wed Oct 20 00:21:08 2010
@@ -126,6 +126,9 @@ public class ReplicationSource extends T
private volatile boolean running = true;
// Metrics for this source
private ReplicationSourceMetrics metrics;
+ // If source is enabled, replication happens. If disabled, nothing will be
+ // replicated but HLogs will still be queued
+ private AtomicBoolean sourceEnabled = new AtomicBoolean();
/**
* Instantiation method used by region servers
@@ -199,7 +202,7 @@ public class ReplicationSource extends T
private void chooseSinks() throws KeeperException {
this.currentPeers.clear();
List<HServerAddress> addresses =
- this.zkHelper.getPeersAddresses(peerClusterId);
+ this.zkHelper.getSlavesAddresses(peerClusterId);
Set<HServerAddress> setOfAddr = new HashSet<HServerAddress>();
int nbPeers = (int) (Math.ceil(addresses.size() * ratio));
LOG.info("Getting " + nbPeers +
@@ -236,14 +239,20 @@ public class ReplicationSource extends T
this.position = this.zkHelper.getHLogRepPosition(
this.peerClusterZnode, this.queue.peek().getName());
} catch (KeeperException e) {
- LOG.error("Couldn't get the position of this recovered queue " +
+ this.terminate("Couldn't get the position of this recovered queue " +
peerClusterZnode, e);
- this.abort();
}
}
int sleepMultiplier = 1;
// Loop until we close down
while (!stopper.isStopped() && this.running) {
+ // Sleep until replication is enabled again
+ if (!this.replicating.get() || !this.sourceEnabled.get()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
// Get a new path
if (!getNextPath()) {
if (sleepForRetries("No log to process", sleepMultiplier)) {
@@ -419,7 +428,7 @@ public class ReplicationSource extends T
*/
protected boolean openReader(int sleepMultiplier) {
try {
- LOG.info("Opening log for replication " + this.currentPath.getName() +
+ LOG.debug("Opening log for replication " + this.currentPath.getName() +
" at " + this.position);
try {
this.reader = null;
@@ -445,6 +454,12 @@ public class ReplicationSource extends T
// TODO What happens if the log was missing from every single location?
// Although we need to check a couple of times as the log could have
// been moved by the master between the checks
+ // It can also happen if a recovered queue wasn't properly cleaned,
+ // such that the znode pointing to a log exists but the log was
+ // deleted a long time ago.
+ // For the moment, we'll throw the IO and processEndOfFile
+ throw new IOException("File from recovered queue is " +
+ "nowhere to be found", fnfe);
} else {
// If the log was archived, continue reading from there
Path archivedLogLocation =
@@ -590,7 +605,7 @@ public class ReplicationSource extends T
return true;
} else if (this.queueRecovered) {
this.manager.closeRecoveredQueue(this);
- this.abort();
+ this.terminate("Finished recovering the queue");
return true;
}
return false;
@@ -601,25 +616,26 @@ public class ReplicationSource extends T
Thread.UncaughtExceptionHandler handler =
new Thread.UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
- LOG.fatal("Set stop flag in " + t.getName(), e);
- abort();
+ terminate("Uncaught exception during runtime", new Exception(e));
}
};
Threads.setDaemonThreadRunning(
this, n + ".replicationSource," + peerClusterZnode, handler);
}
- /**
- * Hastily stop the replication, then wait for shutdown
- */
- private void abort() {
- LOG.info("abort");
- this.running = false;
- terminate();
+ public void terminate(String reason) {
+ terminate(reason, null);
}
- public void terminate() {
- LOG.info("terminate");
+ public void terminate(String reason, Exception cause) {
+ if (cause == null) {
+ LOG.error("Closing source " + this.peerClusterZnode
+ + " because an error occurred: " + reason, cause);
+ } else {
+ LOG.info("Closing source "
+ + this.peerClusterZnode + " because: " + reason);
+ }
+ this.running = false;
Threads.shutdown(this, this.sleepForRetries);
}
@@ -663,23 +679,22 @@ public class ReplicationSource extends T
return down;
}
- /**
- * Get the id that the source is replicating to
- *
- * @return peer cluster id
- */
public String getPeerClusterZnode() {
return this.peerClusterZnode;
}
- /**
- * Get the path of the current HLog
- * @return current hlog's path
- */
+ public String getPeerClusterId() {
+ return this.peerClusterId;
+ }
+
public Path getCurrentPath() {
return this.currentPath;
}
+ public void setSourceEnabled(boolean status) {
+ this.sourceEnabled.set(status);
+ }
+
/**
* Comparator used to compare logs together based on their start time
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Wed Oct 20 00:21:08 2010
@@ -68,8 +68,16 @@ public interface ReplicationSourceInterf
/**
* End the replication
+ * @param reason why it's terminating
*/
- public void terminate();
+ public void terminate(String reason);
+
+ /**
+ * End the replication
+ * @param reason why it's terminating
+ * @param cause the error that's causing it
+ */
+ public void terminate(String reason, Exception cause);
/**
* Get the id that the source is replicating to
@@ -77,4 +85,17 @@ public interface ReplicationSourceInterf
* @return peer cluster id
*/
public String getPeerClusterZnode();
+
+ /**
+ * Get the id that the source is replicating to.
+ *
+ * @return peer cluster id
+ */
+ public String getPeerClusterId();
+
+ /**
+ * Set if this source is enabled or disabled
+ * @param status the new status
+ */
+ public void setSourceEnabled(boolean status);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Wed Oct 20 00:21:08 2010
@@ -38,6 +38,7 @@ import org.apache.hadoop.hbase.Stoppable
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
/**
* This class is responsible to manage all the replication
@@ -108,6 +109,9 @@ public class ReplicationSourceManager {
new OtherRegionServerWatcher(this.zkHelper.getZookeeperWatcher()));
List<String> otherRSs =
this.zkHelper.getRegisteredRegionServers();
+ this.zkHelper.registerRegionServerListener(
+ new PeersWatcher(this.zkHelper.getZookeeperWatcher()));
+ this.zkHelper.listPeersIdsAndWatch();
this.otherRegionServers = otherRSs == null ? new ArrayList<String>() : otherRSs;
}
@@ -144,8 +148,7 @@ public class ReplicationSourceManager {
*/
public void init() throws IOException {
for (String id : this.zkHelper.getPeerClusters().keySet()) {
- ReplicationSourceInterface src = addSource(id);
- src.startup();
+ addSource(id);
}
List<String> currentReplicators = this.zkHelper.getRegisteredRegionServers();
if (currentReplicators == null || currentReplicators.size() == 0) {
@@ -168,20 +171,24 @@ public class ReplicationSourceManager {
/**
* Add a new normal source to this region server
* @param id the id of the peer cluster
- * @return the created source
+ * @return the source that was created
* @throws IOException
*/
public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
- this.sources.add(src);
+ // TODO set it to what's in ZK
+ src.setSourceEnabled(true);
synchronized (this.hlogs) {
+ this.sources.add(src);
if (this.hlogs.size() > 0) {
- this.zkHelper.addLogToList(this.hlogs.first(),
+ // Add the latest hlog to that source's queue
+ this.zkHelper.addLogToList(this.hlogs.last(),
this.sources.get(0).getPeerClusterZnode());
src.enqueueLog(this.latestPath);
}
}
+ src.startup();
return src;
}
@@ -193,7 +200,7 @@ public class ReplicationSourceManager {
this.zkHelper.deleteOwnRSZNode();
}
for (ReplicationSourceInterface source : this.sources) {
- source.terminate();
+ source.terminate("Region server is closing");
}
}
@@ -214,6 +221,11 @@ public class ReplicationSourceManager {
}
void logRolled(Path newLog) {
+ if (!this.replicating.get()) {
+ LOG.warn("Replication stopped, won't add new log");
+ return;
+ }
+
if (this.sources.size() > 0) {
this.zkHelper.addLogToList(newLog.getName(),
this.sources.get(0).getPeerClusterZnode());
@@ -300,10 +312,16 @@ public class ReplicationSourceManager {
try {
ReplicationSourceInterface src = getReplicationSource(this.conf,
this.fs, this, this.stopper, this.replicating, peerId);
+ if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+ src.terminate("Recovered queue doesn't belong to any current peer");
+ break;
+ }
this.oldsources.add(src);
for (String hlog : entry.getValue()) {
src.enqueueLog(new Path(this.oldLogDir, hlog));
}
+ // TODO set it to what's in ZK
+ src.setSourceEnabled(true);
src.startup();
} catch (IOException e) {
// TODO manage it
@@ -319,7 +337,46 @@ public class ReplicationSourceManager {
public void closeRecoveredQueue(ReplicationSourceInterface src) {
LOG.info("Done with the recovered queue " + src.getPeerClusterZnode());
this.oldsources.remove(src);
- this.zkHelper.deleteSource(src.getPeerClusterZnode());
+ this.zkHelper.deleteSource(src.getPeerClusterZnode(), false);
+ }
+
+ /**
+ * Thie method first deletes all the recovered sources for the specified
+ * id, then deletes the normal source (deleting all related data in ZK).
+ * @param id The id of the peer cluster
+ */
+ public void removePeer(String id) {
+ LOG.info("Closing the following queue " + id + ", currently have "
+ + sources.size() + " and another "
+ + oldsources.size() + " that were recovered");
+ ReplicationSourceInterface srcToRemove = null;
+ List<ReplicationSourceInterface> oldSourcesToDelete =
+ new ArrayList<ReplicationSourceInterface>();
+ // First close all the recovered sources for this peer
+ for (ReplicationSourceInterface src : oldsources) {
+ if (id.equals(src.getPeerClusterId())) {
+ oldSourcesToDelete.add(src);
+ }
+ }
+ for (ReplicationSourceInterface src : oldSourcesToDelete) {
+ closeRecoveredQueue((src));
+ }
+ LOG.info("Number of deleted recovered sources for " + id + ": "
+ + oldSourcesToDelete.size());
+ // Now look for the one on this cluster
+ for (ReplicationSourceInterface src : this.sources) {
+ if (id.equals(src.getPeerClusterId())) {
+ srcToRemove = src;
+ break;
+ }
+ }
+ if (srcToRemove == null) {
+ LOG.error("The queue we wanted to close is missing " + id);
+ return;
+ }
+ srcToRemove.terminate("Replication stream was removed by a user");
+ this.sources.remove(srcToRemove);
+ this.zkHelper.deleteSource(id, true);
}
/**
@@ -354,8 +411,7 @@ public class ReplicationSourceManager {
return;
}
LOG.info(path + " znode expired, trying to lock it");
- String[] rsZnodeParts = path.split("/");
- transferQueues(rsZnodeParts[rsZnodeParts.length-1]);
+ transferQueues(zkHelper.getZNodeName(path));
}
/**
@@ -384,6 +440,70 @@ public class ReplicationSourceManager {
}
/**
+ * Watcher used to follow the creation and deletion of peer clusters.
+ */
+ public class PeersWatcher extends ZooKeeperListener {
+
+ /**
+ * Construct a ZooKeeper event listener.
+ */
+ public PeersWatcher(ZooKeeperWatcher watcher) {
+ super(watcher);
+ }
+
+ /**
+ * Called when a node has been deleted
+ * @param path full path of the deleted node
+ */
+ public void nodeDeleted(String path) {
+ List<String> peers = refreshPeersList(path);
+ if (peers == null) {
+ return;
+ }
+ String id = zkHelper.getZNodeName(path);
+ removePeer(id);
+ }
+
+ /**
+ * Called when an existing node has a child node added or removed.
+ * @param path full path of the node whose children have changed
+ */
+ public void nodeChildrenChanged(String path) {
+ List<String> peers = refreshPeersList(path);
+ if (peers == null) {
+ return;
+ }
+ for (String id : peers) {
+ try {
+ boolean added = zkHelper.connectToPeer(id);
+ if (added) {
+ addSource(id);
+ }
+ } catch (IOException e) {
+ // TODO manage better than that ?
+ LOG.error("Error while adding a new peer", e);
+ } catch (KeeperException e) {
+ LOG.error("Error while adding a new peer", e);
+ }
+ }
+ }
+
+ /**
+ * Verify if this event is meant for us, and if so then get the latest
+ * peers' list from ZK. Also reset the watches.
+ * @param path path to check against
+ * @return A list of peers' identifiers if the event concerns this watcher,
+ * else null.
+ */
+ private List<String> refreshPeersList(String path) {
+ if (!path.startsWith(zkHelper.getPeersZNode())) {
+ return null;
+ }
+ return zkHelper.listPeersIdsAndWatch();
+ }
+ }
+
+ /**
* Get the directory where hlogs are archived
* @return the directory where hlogs are archived
*/
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Wed Oct 20 00:21:08 2010
@@ -276,6 +276,27 @@ public class ZKUtil {
}
/**
+ * List all the children of the specified znode, setting a watch for children
+ * changes and also setting a watch on every individual child in order to get
+ * the NodeCreated and NodeDeleted events.
+ * @param zkw zookeeper reference
+ * @param znode node to get children of and watch
+ * @return list of znode names, null if the node doesn't exist
+ * @throws KeeperException
+ */
+ public static List<String> listChildrenAndWatchThem(ZooKeeperWatcher zkw,
+ String znode) throws KeeperException {
+ List<String> children = listChildrenAndWatchForNewChildren(zkw, znode);
+ if (children == null) {
+ return null;
+ }
+ for (String child : children) {
+ watchAndCheckExists(zkw, joinZNode(znode, child));
+ }
+ return children;
+ }
+
+ /**
* Lists the children of the specified znode, retrieving the data of each
* child as a server address.
*
Modified: hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (original)
+++ hbase/trunk/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html Wed Oct 20 00:21:08 2010
@@ -47,7 +47,8 @@ features:
<li>Replication of scoped families in user tables.</li>
<li>Start/stop replication stream.</li>
<li>Supports clusters of different sizes.</li>
- <li>Handling of partitions longer than 10 minutes</li>
+ <li>Handling of partitions longer than 10 minutes.</li>
+ <li>Ability to add/remove slave clusters at runtime.</li>
</ol>
Please report bugs on the project's Jira when found.
<p>
@@ -80,7 +81,7 @@ Before trying out replication, make sure
<p>
The following steps describe how to enable replication from a cluster
-to another. This must be done with both clusters offlined.
+to another.
<ol>
<li>Edit ${HBASE_HOME}/conf/hbase-site.xml on both cluster to add
the following configurations:
@@ -90,15 +91,13 @@ to another. This must be done with both
<value>true</value>
</property></pre>
</li>
- <li>Run the following command on any cluster:
- <pre>
-$HBASE_HOME/bin/hbase org.jruby.Main $HBASE_HOME/bin/replication/add_peer.tb</pre>
+ <li>Run the following command in the master's shell while it's running
+ <pre>add_peer</pre>
This will show you the help to setup the replication stream between
both clusters. If both clusters use the same Zookeeper cluster, you have
to use a different <b>zookeeper.znode.parent</b> since they can't
write in the same folder.
</li>
- <li>You can now start and stop the clusters with your preferred method.</li>
</ol>
You can confirm that your setup works by looking at any region server's log
@@ -115,12 +114,11 @@ was chosen for replication.<br><br>
Should you want to stop the replication while the clusters are running, open
the shell on the master cluster and issue this command:
<pre>
-hbase(main):001:0> zk 'set /zookeeper.znode.parent/replication/state false'</pre>
+hbase(main):001:0> stop_replication</pre>
-Where you replace the znode parent with the one configured on your master
-cluster. Replication of already queued edits will still happen after you
+Replication of already queued edits will still happen after you
issued that command but new entries won't be. To start it back, simply replace
-"false" with "true" in the command.
+"false" with "true" in the command.
<p>
Modified: hbase/trunk/src/main/ruby/hbase.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase.rb (original)
+++ hbase/trunk/src/main/ruby/hbase.rb Wed Oct 20 00:21:08 2010
@@ -72,3 +72,4 @@ end
require 'hbase/hbase'
require 'hbase/admin'
require 'hbase/table'
+require 'hbase/replication_admin'
Modified: hbase/trunk/src/main/ruby/hbase/hbase.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/hbase.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/hbase.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/hbase.rb Wed Oct 20 00:21:08 2010
@@ -49,5 +49,10 @@ module Hbase
def table(table, formatter)
::Hbase::Table.new(configuration, table, formatter)
end
+
+ def replication_admin(formatter)
+ ::Hbase::RepAdmin.new(configuration, formatter)
+ end
+
end
end
Added: hbase/trunk/src/main/ruby/hbase/replication_admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/replication_admin.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/replication_admin.rb (added)
+++ hbase/trunk/src/main/ruby/hbase/replication_admin.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,72 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+include Java
+
+java_import org.apache.hadoop.hbase.client.replication.ReplicationAdmin
+
+# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
+
+module Hbase
+ class RepAdmin
+ include HBaseConstants
+
+ def initialize(configuration, formatter)
+ @replication_admin = ReplicationAdmin.new(configuration)
+ @formatter = formatter
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Add a new peer cluster to replicate to
+ def add_peer(id, cluster_key)
+ @replication_admin.addPeer(id, cluster_key)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Remove a peer cluster, stops the replication
+ def remove_peer(id)
+ @replication_admin.removePeer(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Restart the replication stream to the specified peer
+ def enable_peer(id)
+ @replication_admin.enablePeer(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Stop the replication stream to the specified peer
+ def disable_peer(id)
+ @replication_admin.disablePeer(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Restart the replication, in an unknown state
+ def start_replication
+ @replication_admin.setReplicating(true)
+ end
+
+ #----------------------------------------------------------------------------------------------
+ # Kill switch for replication, stops all its features
+ def stop_replication
+ @replication_admin.setReplicating(false)
+ end
+ end
+end
Modified: hbase/trunk/src/main/ruby/shell.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell.rb (original)
+++ hbase/trunk/src/main/ruby/shell.rb Wed Oct 20 00:21:08 2010
@@ -83,6 +83,10 @@ module Shell
hbase.table(name, formatter)
end
+ def hbase_replication_admin
+ @hbase_replication_admin ||= hbase.replication_admin(formatter)
+ end
+
def export_commands(where)
::Shell.commands.keys.each do |cmd|
where.send :instance_eval, <<-EOF
@@ -254,3 +258,17 @@ Shell.load_command_group(
]
)
+Shell.load_command_group(
+ 'replication',
+ :full_name => 'CLUSTER REPLICATION TOOLS',
+ :comment => "In order to use these tools, hbase.replication must be true. enabling/disabling is currently unsupported",
+ :commands => %w[
+ add_peer
+ remove_peer
+ enable_peer
+ disable_peer
+ start_replication
+ stop_replication
+ ]
+)
+
Modified: hbase/trunk/src/main/ruby/shell/commands.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands.rb?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands.rb (original)
+++ hbase/trunk/src/main/ruby/shell/commands.rb Wed Oct 20 00:21:08 2010
@@ -49,6 +49,10 @@ module Shell
shell.hbase_table(name)
end
+ def replication_admin
+ shell.hbase_replication_admin
+ end
+
#----------------------------------------------------------------------
def formatter
Added: hbase/trunk/src/main/ruby/shell/commands/add_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/add_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/add_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/add_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class AddPeer< Command
+ def help
+ return <<-EOF
+ Add a peer cluster to replicate to, the id must be a short and
+ the cluster key is composed like this:
+ hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+ This gives a full path for HBase to connect to another cluster.
+ Examples:
+
+ hbase> add_peer '1', "server1.cie.com:2181:/hbase"
+ hbase> add_peer '2', "zk1,zk2,zk3:2182:/hbase-prod"
+ EOF
+ end
+
+ def command(id, cluster_key)
+ format_simple_command do
+ replication_admin.add_peer(id, cluster_key)
+ end
+ end
+ end
+ end
+end
Added: hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class DisablePeer< Command
+ def help
+ return <<-EOF
+ Stops the replication stream to the specified cluster, but still
+ keeps track of new edits to replicate.
+
+ CURRENTLY UNSUPPORTED
+
+ Examples:
+
+ hbase> disable_peer '1'
+ EOF
+ end
+
+ def command(id)
+ format_simple_command do
+ replication_admin.disable_peer(id)
+ end
+ end
+ end
+ end
+end
Added: hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,44 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class EnablePeer< Command
+ def help
+ return <<-EOF
+ Restarts the replication to the specified peer cluster,
+ continuing from where it was disabled.
+
+ CURRENTLY UNSUPPORTED
+
+ Examples:
+
+ hbase> enable_peer '1'
+ EOF
+ end
+
+ def command(id)
+ format_simple_command do
+ replication_admin.enable_peer(id)
+ end
+ end
+ end
+ end
+end
Added: hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/remove_peer.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,41 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class RemovePeer< Command
+ def help
+ return <<-EOF
+ Stops the specified replication stream and deletes all the meta
+ information kept about it.
+ Examples:
+
+ hbase> remove_peer '1'
+ EOF
+ end
+
+ def command(id)
+ format_simple_command do
+ replication_admin.remove_peer(id)
+ end
+ end
+ end
+ end
+end
Added: hbase/trunk/src/main/ruby/shell/commands/start_replication.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/start_replication.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/start_replication.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/start_replication.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class StartReplication < Command
+ def help
+ return <<-EOF
+ Restarts all the replication features. The state in which each
+ stream starts in is undetermined.
+ WARNING:
+ start/stop replication is only meant to be used in critical load situations.
+ Examples:
+
+ hbase> start_replication
+ EOF
+ end
+
+ def command
+ format_simple_command do
+ replication_admin.start_replication
+ end
+ end
+ end
+ end
+end
Added: hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb (added)
+++ hbase/trunk/src/main/ruby/shell/commands/stop_replication.rb Wed Oct 20 00:21:08 2010
@@ -0,0 +1,43 @@
+#
+# Copyright 2010 The Apache Software Foundation
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+module Shell
+ module Commands
+ class StopReplication < Command
+ def help
+ return <<-EOF
+ Stops all the replication features. The state in which each
+ stream stops in is undetermined.
+ WARNING:
+ start/stop replication is only meant to be used in critical load situations.
+ Examples:
+
+ hbase> stop_replication
+ EOF
+ end
+
+ def command
+ format_simple_command do
+ replication_admin.stop_replication
+ end
+ end
+ end
+ end
+end
Modified: hbase/trunk/src/site/xdoc/replication.xml
URL: http://svn.apache.org/viewvc/hbase/trunk/src/site/xdoc/replication.xml?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/site/xdoc/replication.xml (original)
+++ hbase/trunk/src/site/xdoc/replication.xml Wed Oct 20 00:21:08 2010
@@ -31,7 +31,7 @@
<p>
HBase replication is a way to copy data between HBase deployments. It
can serve as a disaster recovery solution and can contribute to provide
- higher availability at HBase layer. It can also serve more practically;
+ higher availability at the HBase layer. It can also serve more practically;
for example, as a way to easily copy edits from a web-facing cluster to a "MapReduce"
cluster which will process old and new data and ship back the results
automatically.
@@ -97,10 +97,10 @@
</p>
<p>
Synchronously, the region server that receives the edits reads them
- sequentially and applies them on its own cluster using the HBase client
- (HTables managed by a HTablePool) automatically. If consecutive rows
- belong to the same table, they are inserted together in order to
- leverage parallel insertions.
+ sequentially and separates each of them into buffers, one per table.
+ Once all edits are read, each buffer is flushed using the normal HBase
+ client (HTables managed by a HTablePool). This is done in order to
+ leverage parallel insertion (MultiPut).
</p>
<p>
Back in the master cluster's region server, the offset for the current
@@ -221,23 +221,6 @@
10 times until trying to find a different sink.
</p>
</section>
- <section name="Applying edits">
- <p>
- The sink synchronously applies the edits to its local cluster using
- the native client API. This method is also synchronized between every
- incoming sources, which means that only one batch of log entries can be
- replicated at a time by each slave region server.
- </p>
- <p>
- The sink applies the edits one by one, unless it's able to batch
- sequential Puts that belong to the same table in order to use the
- parallel puts feature of HConnectionManager. The Put and Delete objects
- are recreated by inspecting the incoming WALEdit objects and are
- with the exact same row, family, qualifier, timestamp, and value (for
- Put). Note that if the master and slave cluster don't have the same
- time, time-related issues may occur.
- </p>
- </section>
<section name="Cleaning logs">
<p>
If replication isn't enabled, the master's logs cleaning thread will
@@ -402,10 +385,6 @@
</p>
<ol>
<li>
- HBASE-2688, master-master replication is disabled in the code, we need
- to enable and test it.
- </li>
- <li>
HBASE-2611, basically if a region server dies while recovering the
queues of another dead RS, we will miss the data from the queues
that weren't copied.
@@ -419,9 +398,8 @@
carry that data and check it.
</li>
<li>
- HBASE-2200, currently all the replication operations (adding or removing
- streams for example) are done only when the clusters are offline. This
- should be possible at runtime.
+ HBASE-3130, the master cluster needs to be restarted if its region
+ servers lose their session with a slave cluster.
</li>
</ol>
</section>
Added: hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java?rev=1024470&view=auto
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java (added)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java Wed Oct 20 00:21:08 2010
@@ -0,0 +1,92 @@
+package org.apache.hadoop.hbase.client.replication;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import static org.junit.Assert.fail;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Unit testing of ReplicationAdmin
+ */
+public class TestReplicationAdmin {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestReplicationAdmin.class);
+ private final static HBaseTestingUtility TEST_UTIL =
+ new HBaseTestingUtility();
+
+ private final String ID_ONE = "1";
+ private final String KEY_ONE = "127.0.0.1:2181:/hbase";
+ private final String ID_SECOND = "2";
+ private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
+
+ private static ReplicationSourceManager manager;
+ private static ReplicationAdmin admin;
+ private static AtomicBoolean replicating = new AtomicBoolean(true);
+
+ /**
+ * @throws java.lang.Exception
+ */
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ TEST_UTIL.startMiniZKCluster();
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
+ admin = new ReplicationAdmin(conf);
+ Path oldLogDir = new Path(TEST_UTIL.getTestDir(),
+ HConstants.HREGION_OLDLOGDIR_NAME);
+ Path logDir = new Path(TEST_UTIL.getTestDir(),
+ HConstants.HREGION_LOGDIR_NAME);
+ manager = new ReplicationSourceManager(admin.getReplicationZk(),
+ conf, null, FileSystem.get(conf), replicating, logDir, oldLogDir);
+ }
+
+ /**
+ * Simple testing of adding and removing peers, basically shows that
+ * all interactions with ZK work
+ * @throws Exception
+ */
+ @Test
+ public void testAddRemovePeer() throws Exception {
+ assertEquals(0, manager.getSources().size());
+ // Add a valid peer
+ admin.addPeer(ID_ONE, KEY_ONE);
+ // try adding the same (fails)
+ try {
+ admin.addPeer(ID_ONE, KEY_ONE);
+ } catch (IllegalArgumentException iae) {
+ // OK!
+ }
+ assertEquals(1, admin.getPeersCount());
+ // Try to remove an inexisting peer
+ try {
+ admin.removePeer(ID_SECOND);
+ fail();
+ } catch (IllegalArgumentException iae) {
+ // OK!
+ }
+ assertEquals(1, admin.getPeersCount());
+ // Add a second, returns illegal since multi-slave isn't supported
+ try {
+ admin.addPeer(ID_SECOND, KEY_SECOND);
+ fail();
+ } catch (IllegalStateException iae) {
+ // OK!
+ }
+ assertEquals(1, admin.getPeersCount());
+ // Remove the first peer we added
+ admin.removePeer(ID_ONE);
+ assertEquals(0, admin.getPeersCount());
+ }
+}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Wed Oct 20 00:21:08 2010
@@ -63,7 +63,12 @@ public class ReplicationSourceDummy impl
}
@Override
- public void terminate() {
+ public void terminate(String reason) {
+
+ }
+
+ @Override
+ public void terminate(String reason, Exception e) {
}
@@ -71,4 +76,15 @@ public class ReplicationSourceDummy impl
public String getPeerClusterZnode() {
return peerClusterId;
}
+
+ @Override
+ public String getPeerClusterId() {
+ return peerClusterId;
+ }
+
+ @Override
+ public void setSourceEnabled(boolean status) {
+
+ }
+
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Wed Oct 20 00:21:08 2010
@@ -40,6 +40,7 @@ import org.apache.hadoop.hbase.client.Pu
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
@@ -59,6 +60,9 @@ public class TestReplication {
private static ZooKeeperWatcher zkw1;
private static ZooKeeperWatcher zkw2;
+ private static ReplicationAdmin admin;
+ private static String slaveClusterKey;
+
private static HTable htable1;
private static HTable htable2;
@@ -92,16 +96,12 @@ public class TestReplication {
conf1.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
conf1.setBoolean("dfs.support.append", true);
conf1.setLong(HConstants.THREAD_WAKE_FREQUENCY, 100);
+
utility1 = new HBaseTestingUtility(conf1);
utility1.startMiniZKCluster();
MiniZooKeeperCluster miniZK = utility1.getZkCluster();
zkw1 = new ZooKeeperWatcher(conf1, "cluster1", null);
- ZKUtil.createWithParents(zkw1, "/1/replication/master");
- ZKUtil.createWithParents(zkw1, "/1/replication/state");
- ZKUtil.setData(zkw1, "/1/replication/master", Bytes.toBytes(
- conf1.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf1.get("hbase.zookeeper.property.clientPort")+":/1"));
- setIsReplication(true);
+ admin = new ReplicationAdmin(conf1);
LOG.info("Setup first Zk");
conf2 = HBaseConfiguration.create();
@@ -114,12 +114,11 @@ public class TestReplication {
utility2 = new HBaseTestingUtility(conf2);
utility2.setZkCluster(miniZK);
zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null);
- ZKUtil.createWithParents(zkw2, "/2/replication");
- ZKUtil.createWithParents(zkw1, "/1/replication/peers/2");
- ZKUtil.setData(zkw1, "/1/replication/peers/2", Bytes.toBytes(
- conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf2.get("hbase.zookeeper.property.clientPort")+":/2"));
+ slaveClusterKey = conf2.get(HConstants.ZOOKEEPER_QUORUM)+":" +
+ conf2.get("hbase.zookeeper.property.clientPort")+":/2";
+ admin.addPeer("2", slaveClusterKey);
+ setIsReplication(true);
LOG.info("Setup second Zk");
@@ -145,9 +144,7 @@ public class TestReplication {
private static void setIsReplication(boolean rep) throws Exception {
LOG.info("Set rep " + rep);
- ZKUtil.setData(zkw1,"/1/replication/state",
- Bytes.toBytes(Boolean.toString(rep)));
- // Takes some ms for ZK to fire the watcher
+ admin.setReplicating(rep);
Thread.sleep(SLEEP_TIME);
}
@@ -156,12 +153,31 @@ public class TestReplication {
*/
@Before
public void setUp() throws Exception {
- setIsReplication(false);
utility1.truncateTable(tableName);
- utility2.truncateTable(tableName);
- // If test is flaky, set that sleep higher
- Thread.sleep(SLEEP_TIME*8);
- setIsReplication(true);
+ // truncating the table will send on Delete per row to the slave cluster
+ // in an async fashion, which is why we cannot just call truncateTable on
+ // utility2 since late writes could make it to the slave in some way.
+ // Instead, we truncate the first table and wait for all the Deletes to
+ // make it to the slave.
+ Scan scan = new Scan();
+ int lastCount = 0;
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for truncate");
+ }
+ ResultScanner scanner = htable2.getScanner(scan);
+ Result[] res = scanner.next(NB_ROWS_IN_BATCH);
+ scanner.close();
+ if (res.length != 0) {
+ if (lastCount < res.length) {
+ i--; // Don't increment timeout if we make progress
+ }
+ LOG.info("Still got " + res.length + " rows");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ break;
+ }
+ }
}
/**
@@ -186,13 +202,12 @@ public class TestReplication {
htable1 = new HTable(conf1, tableName);
htable1.put(put);
- HTable table2 = new HTable(conf2, tableName);
Get get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for put replication");
}
- Result res = table2.get(get);
+ Result res = htable2.get(get);
if (res.size() == 0) {
LOG.info("Row not available");
Thread.sleep(SLEEP_TIME);
@@ -205,13 +220,12 @@ public class TestReplication {
Delete del = new Delete(row);
htable1.delete(del);
- table2 = new HTable(conf2, tableName);
get = new Get(row);
for (int i = 0; i < NB_RETRIES; i++) {
if (i==NB_RETRIES-1) {
fail("Waited too much time for del replication");
}
- Result res = table2.get(get);
+ Result res = htable2.get(get);
if (res.size() >= 1) {
LOG.info("Row not deleted");
Thread.sleep(SLEEP_TIME);
@@ -334,6 +348,59 @@ public class TestReplication {
}
/**
+ * Integration test for TestReplicationAdmin, removes and re-add a peer
+ * cluster
+ * @throws Exception
+ */
+ @Test
+ public void testAddAndRemoveClusters() throws Exception {
+ LOG.info("testAddAndRemoveClusters");
+ admin.removePeer("2");
+ Thread.sleep(SLEEP_TIME);
+ byte[] rowKey = Bytes.toBytes("Won't be replicated");
+ Put put = new Put(rowKey);
+ put.add(famName, row, row);
+ htable1.put(put);
+
+ Get get = new Get(rowKey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i == NB_RETRIES-1) {
+ break;
+ }
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Not supposed to be replicated");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ admin.addPeer("2", slaveClusterKey);
+ Thread.sleep(SLEEP_TIME);
+ rowKey = Bytes.toBytes("do rep");
+ put = new Put(rowKey);
+ put.add(famName, row, row);
+ LOG.info("Adding new row");
+ htable1.put(put);
+
+ get = new Get(rowKey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ if (i==NB_RETRIES-1) {
+ fail("Waited too much time for put replication");
+ }
+ Result res = htable2.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME*i);
+ } else {
+ assertArrayEquals(res.value(), row);
+ break;
+ }
+ }
+ }
+
+ /**
* Do a more intense version testSmallBatch, one that will trigger
* hlog rolling and other non-trivial code paths
* @throws Exception
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1024470&r1=1024469&r2=1024470&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Wed Oct 20 00:21:08 2010
@@ -64,8 +64,6 @@ public class TestReplicationSourceManage
private static HBaseTestingUtility utility;
- private static final AtomicBoolean REPLICATING = new AtomicBoolean(false);
-
private static Replication replication;
private static ReplicationSourceManager manager;
@@ -105,14 +103,12 @@ public class TestReplicationSourceManage
zkw = new ZooKeeperWatcher(conf, "test", null);
ZKUtil.createWithParents(zkw, "/hbase/replication");
- ZKUtil.createWithParents(zkw, "/hbase/replication/master");
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
- ZKUtil.setData(zkw, "/hbase/replication/master",
- Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
- conf.get("hbase.zookeeper.property.clientPort")+":/1"));
ZKUtil.setData(zkw, "/hbase/replication/peers/1",Bytes.toBytes(
conf.get(HConstants.ZOOKEEPER_QUORUM)+":" +
conf.get("hbase.zookeeper.property.clientPort")+":/1"));
+ ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+ ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));
replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
manager = replication.getReplicationManager();