You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by la...@apache.org on 2012/04/03 06:14:46 UTC
svn commit: r1308675 - in /hbase/trunk/src:
main/java/org/apache/hadoop/hbase/client/replication/
main/java/org/apache/hadoop/hbase/replication/
main/java/org/apache/hadoop/hbase/replication/regionserver/
main/ruby/hbase/ main/ruby/shell/commands/ test...
Author: larsh
Date: Tue Apr 3 04:14:46 2012
New Revision: 1308675
URL: http://svn.apache.org/viewvc?rev=1308675&view=rev
Log:
HBASE-3134 [replication] Add the ability to enable/disable streams (Teruyoshi Zenmyo)
Modified:
hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/trunk/src/main/ruby/hbase/replication_admin.rb
hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb
hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb
hbase/trunk/src/main/ruby/shell/commands/list_peers.rb
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Tue Apr 3 04:14:46 2012
@@ -23,7 +23,6 @@ import java.io.Closeable;
import java.io.IOException;
import java.util.Map;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -136,16 +135,16 @@ public class ReplicationAdmin implements
* Restart the replication stream to the specified peer.
* @param id a short that identifies the cluster
*/
- public void enablePeer(String id) {
- throw new NotImplementedException("Not implemented");
+ public void enablePeer(String id) throws IOException {
+ this.replicationZk.enablePeer(id);
}
/**
* Stop the replication stream to the specified peer.
* @param id a short that identifies the cluster
*/
- public void disablePeer(String id) {
- throw new NotImplementedException("Not implemented");
+ public void disablePeer(String id) throws IOException {
+ this.replicationZk.disablePeer(id);
}
/**
@@ -165,6 +164,20 @@ public class ReplicationAdmin implements
}
/**
+ * Get state of the peer
+ *
+ * @param id peer's identifier
+ * @return current state of the peer
+ */
+ public String getPeerState(String id) throws IOException {
+ try {
+ return this.replicationZk.getPeerState(id).name();
+ } catch (KeeperException e) {
+ throw new IOException("Couldn't get the state of the peer " + id, e);
+ }
+ }
+
+ /**
* Get the current status of the kill switch, if the cluster is replicating
* or not.
* @return true if the cluster is replicated, otherwise false
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Tue Apr 3 04:14:46 2012
@@ -31,7 +31,12 @@ import org.apache.hadoop.classification.
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper.PeerState;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
/**
* This class acts as a wrapper for all the objects used to identify and
@@ -50,6 +55,8 @@ public class ReplicationPeer implements
private ZooKeeperWatcher zkw;
private final Configuration conf;
+ private PeerStateTracker peerStateTracker;
+
/**
* Constructor that takes all the objects required to communicate with the
* specified peer, except for the region server addresses.
@@ -66,6 +73,31 @@ public class ReplicationPeer implements
}
/**
+ * start a state tracker to check whether this peer is enabled or not
+ *
+ * @param zookeeper zk watcher for the local cluster
+ * @param peerStateNode path to zk node which stores peer state
+ * @throws KeeperException
+ */
+ public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
+ throws KeeperException {
+ if (ZKUtil.checkExists(zookeeper, peerStateNode) == -1) {
+ ZKUtil.createAndWatch(zookeeper, peerStateNode,
+ Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
+ }
+ this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper,
+ this);
+ this.peerStateTracker.start();
+ this.readPeerStateZnode();
+ }
+
+ private void readPeerStateZnode() {
+ String currentState = Bytes.toString(peerStateTracker.getData(false));
+ this.peerEnabled.set(PeerState.ENABLED.equals(PeerState
+ .valueOf(currentState)));
+ }
+
+ /**
* Get the cluster key of that peer
* @return string consisting of zk ensemble addresses, client port
* and root znode
@@ -152,4 +184,23 @@ public class ReplicationPeer implements
zkw.close();
}
}
+
+ /**
+ * Tracker for state of this peer
+ */
+ public class PeerStateTracker extends ZooKeeperNodeTracker {
+
+ public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
+ Abortable abortable) {
+ super(watcher, peerStateZNode, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ readPeerStateZnode();
+ }
+ }
+ }
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Tue Apr 3 04:14:46 2012
@@ -50,18 +50,20 @@ import org.apache.zookeeper.KeeperExcept
import org.apache.zookeeper.KeeperException.SessionExpiredException;
/**
- * This class serves as a helper for all things related to zookeeper
- * in replication.
+ * This class serves as a helper for all things related to zookeeper in
+ * replication.
* <p/>
- * The layout looks something like this under zookeeper.znode.parent
- * for the master cluster:
+ * The layout looks something like this under zookeeper.znode.parent for the
+ * master cluster:
* <p/>
+ *
* <pre>
* replication/
* state {contains true or false}
* clusterId {contains a byte}
* peers/
* 1/ {contains a full cluster address}
+ * peer-state {contains ENABLED or DISABLED}
* 2/
* ...
* rs/ {lists all RS that replicate}
@@ -82,6 +84,12 @@ public class ReplicationZookeeper implem
LogFactory.getLog(ReplicationZookeeper.class);
// Name of znode we use to lock when failover
private final static String RS_LOCK_ZNODE = "lock";
+
+ // Values of znode which stores state of a peer
+ public static enum PeerState {
+ ENABLED, DISABLED
+ };
+
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
// Map of peer clusters keyed by their id
@@ -96,6 +104,8 @@ public class ReplicationZookeeper implem
private String rsServerNameZnode;
// Name node if the replicationState znode
private String replicationStateNodeName;
+ // Name of zk node which stores peer state
+ private String peerStateNodeName;
private final Configuration conf;
// Is this cluster replicating at the moment?
private AtomicBoolean replicating;
@@ -150,6 +160,8 @@ public class ReplicationZookeeper implem
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
conf.get("zookeeper.znode.replication.peers", "peers");
+ this.peerStateNodeName = conf.get(
+ "zookeeper.znode.replication.peers.state", "peer-state");
this.replicationStateNodeName =
conf.get("zookeeper.znode.replication.state", "state");
String rsZNodeName =
@@ -339,8 +351,10 @@ public class ReplicationZookeeper implem
return null;
}
- return new ReplicationPeer(otherConf, peerId,
+ ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
otherClusterKey);
+ peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+ return peer;
}
/**
@@ -366,7 +380,8 @@ public class ReplicationZookeeper implem
if (!peerExists(id)) {
throw new IllegalArgumentException("Cannot remove inexisting peer");
}
- ZKUtil.deleteNode(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ ZKUtil.deleteNodeRecursively(this.zookeeper,
+ ZKUtil.joinZNode(this.peersZNode, id));
} catch (KeeperException e) {
throw new IOException("Unable to remove a peer", e);
}
@@ -388,6 +403,8 @@ public class ReplicationZookeeper implem
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
ZKUtil.createAndWatch(this.zookeeper,
ZKUtil.joinZNode(this.peersZNode, id), Bytes.toBytes(clusterKey));
+ ZKUtil.createAndWatch(this.zookeeper, getPeerStateNode(id),
+ Bytes.toBytes(PeerState.ENABLED.name())); // enabled by default
} catch (KeeperException e) {
throw new IOException("Unable to add peer", e);
}
@@ -399,6 +416,82 @@ public class ReplicationZookeeper implem
}
/**
+ * Enable replication to the peer
+ *
+ * @param id peer's identifier
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public void enablePeer(String id) throws IOException {
+ changePeerState(id, PeerState.ENABLED);
+ LOG.info("peer " + id + " is enabled");
+ }
+
+ /**
+ * Disable replication to the peer
+ *
+ * @param id peer's identifier
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public void disablePeer(String id) throws IOException {
+ changePeerState(id, PeerState.DISABLED);
+ LOG.info("peer " + id + " is disabled");
+ }
+
+ private void changePeerState(String id, PeerState state) throws IOException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ String peerStateZNode = getPeerStateNode(id);
+ if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+ ZKUtil.setData(this.zookeeper, peerStateZNode,
+ Bytes.toBytes(state.name()));
+ } else {
+ ZKUtil.createAndWatch(zookeeper, peerStateZNode,
+ Bytes.toBytes(state.name()));
+ }
+ LOG.info("state of the peer " + id + " changed to " + state.name());
+ } catch (KeeperException e) {
+ throw new IOException("Unable to change state of the peer " + id, e);
+ }
+ }
+
+ /**
+ * Get state of the peer. This method checks the state by connecting to ZK.
+ *
+ * @param id peer's identifier
+ * @return current state of the peer
+ */
+ public PeerState getPeerState(String id) throws KeeperException {
+ byte[] peerStateBytes = ZKUtil
+ .getData(this.zookeeper, getPeerStateNode(id));
+ return PeerState.valueOf(Bytes.toString(peerStateBytes));
+ }
+
+ /**
+ * Check whether the peer is enabled or not. This method checks the atomic
+ * boolean of ReplicationPeer locally.
+ *
+ * @param id peer identifier
+ * @return true if the peer is enabled, otherwise false
+ * @throws IllegalArgumentException
+ * Thrown when the peer doesn't exist
+ */
+ public boolean getPeerEnabled(String id) {
+ if (!this.peerClusters.containsKey(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ return this.peerClusters.get(id).getPeerEnabled().get();
+ }
+
+ private String getPeerStateNode(String id) {
+ return ZKUtil.joinZNode(this.peersZNode,
+ ZKUtil.joinZNode(id, this.peerStateNodeName));
+ }
+
+ /**
* This reads the state znode for replication and sets the atomic boolean
*/
private void readReplicationStateZnode() {
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Apr 3 04:14:46 2012
@@ -138,9 +138,6 @@ public class ReplicationSource extends T
private volatile boolean running = true;
// Metrics for this source
private ReplicationSourceMetrics metrics;
- // If source is enabled, replication happens. If disabled, nothing will be
- // replicated but HLogs will still be queued
- private AtomicBoolean sourceEnabled = new AtomicBoolean();
/**
* Instantiation method used by region servers
@@ -274,7 +271,7 @@ public class ReplicationSource extends T
// Loop until we close down
while (isActive()) {
// Sleep until replication is enabled again
- if (!this.replicating.get() || !this.sourceEnabled.get()) {
+ if (!isPeerEnabled()) {
if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
sleepMultiplier++;
}
@@ -601,6 +598,12 @@ public class ReplicationSource extends T
return;
}
while (this.isActive()) {
+ if (!isPeerEnabled()) {
+ if (sleepForRetries("Replication is disabled", sleepMultiplier)) {
+ sleepMultiplier++;
+ }
+ continue;
+ }
try {
HRegionInterface rrs = getRS();
LOG.debug("Replicating " + currentNbEntries);
@@ -660,6 +663,15 @@ public class ReplicationSource extends T
}
/**
+ * check whether the peer is enabled or not
+ *
+ * @return true if the peer is enabled, otherwise false
+ */
+ protected boolean isPeerEnabled() {
+ return this.replicating.get() && this.zkHelper.getPeerEnabled(peerId);
+ }
+
+ /**
* If the queue isn't empty, switch to the next one
* Else if this is a recovered queue, it means we're done!
* Else we'll just continue to try reading the log file
@@ -765,10 +777,6 @@ public class ReplicationSource extends T
return this.currentPath;
}
- public void setSourceEnabled(boolean status) {
- this.sourceEnabled.set(status);
- }
-
private boolean isActive() {
return !this.stopper.isStopped() && this.running;
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Tue Apr 3 04:14:46 2012
@@ -95,9 +95,4 @@ public interface ReplicationSourceInterf
*/
public String getPeerClusterId();
- /**
- * Set if this source is enabled or disabled
- * @param status the new status
- */
- public void setSourceEnabled(boolean status);
}
Modified: hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Tue Apr 3 04:14:46 2012
@@ -35,7 +35,6 @@ import java.util.concurrent.ThreadPoolEx
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -48,6 +47,8 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
/**
* This class is responsible to manage all the replication
* sources. There are two classes of sources:
@@ -203,8 +204,6 @@ public class ReplicationSourceManager {
public ReplicationSourceInterface addSource(String id) throws IOException {
ReplicationSourceInterface src =
getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
- // TODO set it to what's in ZK
- src.setSourceEnabled(true);
synchronized (this.hlogsById) {
this.sources.add(src);
this.hlogsById.put(id, new TreeSet<String>());
@@ -585,8 +584,6 @@ public class ReplicationSourceManager {
for (String hlog : entry.getValue()) {
src.enqueueLog(new Path(oldLogDir, hlog));
}
- // TODO set it to what's in ZK
- src.setSourceEnabled(true);
src.startup();
} catch (IOException e) {
// TODO manage it
Modified: hbase/trunk/src/main/ruby/hbase/replication_admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/hbase/replication_admin.rb?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/hbase/replication_admin.rb (original)
+++ hbase/trunk/src/main/ruby/hbase/replication_admin.rb Tue Apr 3 04:14:46 2012
@@ -50,6 +50,12 @@ module Hbase
end
#----------------------------------------------------------------------------------------------
+ # Get peer cluster state
+ def get_peer_state(id)
+ @replication_admin.getPeerState(id)
+ end
+
+ #----------------------------------------------------------------------------------------------
# Restart the replication stream to the specified peer
def enable_peer(id)
@replication_admin.enablePeer(id)
Modified: hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb (original)
+++ hbase/trunk/src/main/ruby/shell/commands/disable_peer.rb Tue Apr 3 04:14:46 2012
@@ -26,8 +26,6 @@ module Shell
Stops the replication stream to the specified cluster, but still
keeps track of new edits to replicate.
-CURRENTLY UNSUPPORTED
-
Examples:
hbase> disable_peer '1'
Modified: hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb (original)
+++ hbase/trunk/src/main/ruby/shell/commands/enable_peer.rb Tue Apr 3 04:14:46 2012
@@ -26,8 +26,6 @@ module Shell
Restarts the replication to the specified peer cluster,
continuing from where it was disabled.
-CURRENTLY UNSUPPORTED
-
Examples:
hbase> enable_peer '1'
Modified: hbase/trunk/src/main/ruby/shell/commands/list_peers.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/src/main/ruby/shell/commands/list_peers.rb?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/main/ruby/shell/commands/list_peers.rb (original)
+++ hbase/trunk/src/main/ruby/shell/commands/list_peers.rb Tue Apr 3 04:14:46 2012
@@ -33,10 +33,11 @@ EOF
now = Time.now
peers = replication_admin.list_peers
- formatter.header(["PEER ID", "CLUSTER KEY"])
+ formatter.header(["PEER_ID", "CLUSTER_KEY", "STATE"])
peers.entrySet().each do |e|
- formatter.row([ e.key, e.value ])
+ state = replication_admin.get_peer_state(e.key)
+ formatter.row([ e.key, e.value, state ])
end
formatter.footer(now)
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Tue Apr 3 04:14:46 2012
@@ -19,6 +19,9 @@
*/
package org.apache.hadoop.hbase.replication;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -26,9 +29,6 @@ import org.apache.hadoop.hbase.Stoppable
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceInterface;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager;
-import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
-
/**
* Source that does nothing at all, helpful to test ReplicationSourceManager
*/
@@ -81,10 +81,4 @@ public class ReplicationSourceDummy impl
public String getPeerClusterId() {
return peerClusterId;
}
-
- @Override
- public void setSourceEnabled(boolean status) {
-
- }
-
}
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/TestReplication.java Tue Apr 3 04:14:46 2012
@@ -26,7 +26,14 @@ import static org.junit.Assert.fail;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.LargeTests;
+import org.apache.hadoop.hbase.UnknownScannerException;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.HBaseAdmin;
@@ -445,8 +452,107 @@ public class TestReplication {
}
/**
+ * Test disable/enable replication, trying to insert, make sure nothing's
+ * replicated, enable it, the insert should be replicated
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 300000)
+ public void testDisableEnable() throws Exception {
+
+ // Test disabling replication
+ admin.disablePeer("2");
+
+ byte[] rowkey = Bytes.toBytes("disable enable");
+ Put put = new Put(rowkey);
+ put.add(famName, row, row);
+ htable1.put(put);
+
+ Get get = new Get(rowkey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Replication wasn't disabled");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ // Test enable replication
+ admin.enablePeer("2");
+
+ for (int i = 0; i < NB_RETRIES; i++) {
+ Result res = htable2.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME);
+ } else {
+ assertArrayEquals(res.value(), row);
+ return;
+ }
+ }
+ fail("Waited too much time for put replication");
+ }
+
+ /**
+ * Test disabling an inactive peer. Add a peer which is inactive, trying to
+ * insert, disable the peer, then activate the peer and make sure nothing is
+ * replicated. In Addition, enable the peer and check the updates are
+ * replicated.
+ *
+ * @throws Exception
+ */
+ @Test(timeout = 600000)
+ public void testDisableInactivePeer() throws Exception {
+
+ // enabling and shutdown the peer
+ admin.enablePeer("2");
+ utility2.shutdownMiniHBaseCluster();
+
+ byte[] rowkey = Bytes.toBytes("disable inactive peer");
+ Put put = new Put(rowkey);
+ put.add(famName, row, row);
+ htable1.put(put);
+
+ // wait for the sleep interval of the master cluster to become long
+ Thread.sleep(SLEEP_TIME * NB_RETRIES);
+
+ // disable and start the peer
+ admin.disablePeer("2");
+ utility2.startMiniHBaseCluster(1, 1);
+ Get get = new Get(rowkey);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ Result res = htable2.get(get);
+ if (res.size() >= 1) {
+ fail("Replication wasn't disabled");
+ } else {
+ LOG.info("Row not replicated, let's wait a bit more...");
+ Thread.sleep(SLEEP_TIME);
+ }
+ }
+
+ // Test enable replication
+ admin.enablePeer("2");
+ // wait since the sleep interval would be long
+ Thread.sleep(SLEEP_TIME * NB_RETRIES);
+ for (int i = 0; i < NB_RETRIES; i++) {
+ Result res = htable2.get(get);
+ if (res.size() == 0) {
+ LOG.info("Row not available");
+ Thread.sleep(SLEEP_TIME * NB_RETRIES);
+ } else {
+ assertArrayEquals(res.value(), row);
+ return;
+ }
+ }
+ fail("Waited too much time for put replication");
+ }
+
+ /**
* Integration test for TestReplicationAdmin, removes and re-add a peer
* cluster
+ *
* @throws Exception
*/
@Test(timeout=300000)
Modified: hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1308675&r1=1308674&r2=1308675&view=diff
==============================================================================
--- hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Tue Apr 3 04:14:46 2012
@@ -30,13 +30,23 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.*;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MediumTests;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.catalog.CatalogTracker;
import org.apache.hadoop.hbase.regionserver.wal.HLog;
import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
-import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
+import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -100,6 +110,9 @@ public class TestReplicationSourceManage
ZKUtil.setData(zkw, "/hbase/replication/peers/1",
Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+ ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+ ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+ Bytes.toBytes(ReplicationZookeeper.PeerState.ENABLED.name()));
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
ZKUtil.setData(zkw, "/hbase/replication/state", Bytes.toBytes("true"));