You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/07/09 21:59:39 UTC

svn commit: r1501503 - in /hbase/trunk: hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/ hba...

Author: jdcryans
Date: Tue Jul  9 19:59:39 2013
New Revision: 1501503

URL: http://svn.apache.org/r1501503
Log:
HBASE-8861  Remove ReplicationState completely (Chris Trezzo via JD)

Removed:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
    hbase/trunk/hbase-server/src/main/ruby/shell/commands/start_replication.rb
    hbase/trunk/hbase-server/src/main/ruby/shell/commands/stop_replication.rb
Modified:
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
    hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
    hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
    hbase/trunk/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
    hbase/trunk/hbase-server/src/main/ruby/hbase/replication_admin.rb
    hbase/trunk/hbase-server/src/main/ruby/shell.rb
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
    hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Tue Jul  9 19:59:39 2013
@@ -52,16 +52,6 @@ import java.util.Map;
  * used to keep track of the replication state.
  * </p>
  * <p>
- * Enabling and disabling peers is currently not supported.
- * </p>
- * <p>
- * As cluster replication is still experimental, a kill switch is provided
- * in order to stop all replication-related operations, see
- * {@link #setReplicating(boolean)}. When setting it back to true, the new
- * state of all the replication streams will be unknown and may have holes.
- * Use at your own risk.
- * </p>
- * <p>
  * To see which commands are available in the shell, type
  * <code>replication</code>.
  * </p>
@@ -163,36 +153,6 @@ public class ReplicationAdmin implements
   }
 
   /**
-   * Get the current status of the kill switch, if the cluster is replicating
-   * or not.
-   * @return true if the cluster is replicated, otherwise false
-   */
-  public boolean getReplicating() throws IOException {
-    try {
-      return this.replicationZk.getReplication();
-    } catch (KeeperException e) {
-      throw new IOException("Couldn't get the replication status");
-    }
-  }
-
-  /**
-   * Kill switch for all replication-related features
-   * @param newState true to start replication, false to stop it.
-   * completely
-   * @return the previous state
-   */
-  public boolean setReplicating(boolean newState) throws IOException {
-    boolean prev = true;
-    try {
-      prev = getReplicating();
-      this.replicationZk.setReplication(newState);
-    } catch (KeeperException e) {
-      throw new IOException("Unable to set the replication state", e);
-    }
-    return prev;
-  }
-
-  /**
    * Get the ZK-support tool created and used by this object for replication.
    * @return the ZK-support tool
    */

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java Tue Jul  9 19:59:39 2013
@@ -39,8 +39,6 @@ public abstract class ReplicationStateZK
    * cluster.
    */
   protected final String peerStateNodeName;
-  /** The name of the znode that contains the replication status of the local cluster. */
-  protected final String stateZNode;
   /** The name of the base znode that contains all replication state. */
   protected final String replicationZNode;
   /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
@@ -68,11 +66,9 @@ public abstract class ReplicationStateZK
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
     String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
-    String stateZNodeName = conf.get("zookeeper.znode.replication.state", "state");
     this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
     this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
     this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
-    this.stateZNode = ZKUtil.joinZNode(replicationZNode, stateZNodeName);
     this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
     this.queuesZNode = ZKUtil.joinZNode(replicationZNode, queuesZNodeName);
   }
@@ -90,9 +86,8 @@ public abstract class ReplicationStateZK
   /**
    * @param state
    * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
-   *         use as content of either the cluster state znode -- whether or not we should be
-   *         replicating kept in /hbase/replication/state -- or as content of a peer-state znode
-   *         under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state.
+   *         use as content of a peer-state znode under a peer cluster id as in
+   *         /hbase/replication/peers/PEER_ID/peer-state.
    */
   protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
     byte[] bytes =

Modified: hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Tue Jul  9 19:59:39 2013
@@ -70,7 +70,7 @@ import java.util.concurrent.atomic.Atomi
  * </pre>
  */
 @InterfaceAudience.Private
-public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
+public class ReplicationZookeeper extends ReplicationStateZKBase {
   private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
 
   // Our handle on zookeeper
@@ -79,7 +79,6 @@ public class ReplicationZookeeper extend
   private final Configuration conf;
   // Abortable
   private Abortable abortable;
-  private final ReplicationStateInterface replicationState;
   private final ReplicationPeers replicationPeers;
   private final ReplicationQueues replicationQueues;
 
@@ -95,8 +94,6 @@ public class ReplicationZookeeper extend
     this.conf = conf;
     this.zookeeper = zk;
     setZNodes(abortable);
-    this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
-    this.replicationState.init();
     // TODO This interface is no longer used by anyone using this constructor. When this class goes
     // away, we will no longer have this null initialization business
     this.replicationQueues = null;
@@ -108,19 +105,16 @@ public class ReplicationZookeeper extend
    * Constructor used by region servers, connects to the peer cluster right away.
    *
    * @param server
-   * @param replicating    atomic boolean to start/stop replication
    * @throws IOException
    * @throws KeeperException
    */
-  public ReplicationZookeeper(final Server server, final AtomicBoolean replicating)
+  public ReplicationZookeeper(final Server server)
   throws IOException, KeeperException {
     super(server.getZooKeeper(), server.getConfiguration(), server);
     this.abortable = server;
     this.zookeeper = server.getZooKeeper();
     this.conf = server.getConfiguration();
     setZNodes(server);
-    this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
-    this.replicationState.init();
     this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
     this.replicationQueues.init(server.getServerName().toString());
     this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
@@ -228,25 +222,6 @@ public class ReplicationZookeeper extend
   }
 
   /**
-   * Get the replication status of this cluster. If the state znode doesn't exist it will also
-   * create it and set it true.
-   * @return returns true when it's enabled, else false
-   * @throws KeeperException
-   */
-  public boolean getReplication() throws KeeperException {
-    return this.replicationState.getState();
-  }
-
-  /**
-   * Set the new replication state for this cluster
-   * @param newState
-   * @throws KeeperException
-   */
-  public void setReplication(boolean newState) throws KeeperException {
-    this.replicationState.setState(newState);
-  }
-
-  /**
    * Add a new log to the list of hlogs in zookeeper
    * @param filename name of the hlog's znode
    * @param peerId name of the cluster's znode
@@ -388,9 +363,4 @@ public class ReplicationZookeeper extend
   public String getPeersZNode() {
     return peersZNode;
   }
-
-  @Override
-  public void close() throws IOException {
-    if (replicationState != null) replicationState.close();
-  }
 }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Tue Jul  9 19:59:39 2013
@@ -111,7 +111,6 @@ public class VerifyReplication {
           @Override
           public Void connect(HConnection conn) throws IOException {
             ZooKeeperWatcher localZKW = null;
-            ReplicationZookeeper zk = null;
             ReplicationPeer peer = null;
             try {
               localZKW = new ZooKeeperWatcher(
@@ -134,9 +133,6 @@ public class VerifyReplication {
               if (peer != null) {
                 peer.close();
               }
-              if (zk != null) {
-                zk.close();
-              }
               if (localZKW != null) {
                 localZKW.close();
               }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Tue Jul  9 19:59:39 2013
@@ -23,15 +23,12 @@ import org.apache.commons.logging.LogFac
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.cleaner.BaseLogCleanerDelegate;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
 import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationStateImpl;
-import org.apache.hadoop.hbase.replication.ReplicationStateInterface;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.apache.zookeeper.KeeperException;
 
@@ -49,7 +46,6 @@ public class ReplicationLogCleaner exten
   private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
   private ZooKeeperWatcher zkw;
   private ReplicationQueuesClient replicationQueues;
-  private ReplicationStateInterface replicationState;
   private final Set<String> hlogs = new HashSet<String>();
   private boolean stopped = false;
   private boolean aborted;
@@ -58,15 +54,6 @@ public class ReplicationLogCleaner exten
   @Override
   public boolean isLogDeletable(FileStatus fStat) {
 
-    try {
-      if (!replicationState.getState()) {
-        return false;
-      }
-    } catch (KeeperException e) {
-      abort("Cannot get the state of replication", e);
-      return false;
-    }
-
     // all members of this class are null if replication is disabled, and we
     // return true since false would render the LogsCleaner useless
     if (this.getConf() == null) {
@@ -136,8 +123,6 @@ public class ReplicationLogCleaner exten
     try {
       this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
       this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
-      this.replicationState = new ReplicationStateImpl(zkw, conf, this);
-      this.replicationState.init();
     } catch (KeeperException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     } catch (IOException e) {
@@ -155,14 +140,6 @@ public class ReplicationLogCleaner exten
       LOG.info("Stopping " + this.zkw);
       this.zkw.close();
     }
-    if (this.replicationState != null) {
-      LOG.info("Stopping " + this.replicationState);
-      try {
-        this.replicationState.close();
-      } catch (IOException e) {
-        LOG.error("Error while stopping " + this.replicationState, e);
-      }
-    }
     // Not sure why we're deleting a connection that we never acquired or used
     HConnectionManager.deleteConnection(this.getConf());
   }

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java Tue Jul  9 19:59:39 2013
@@ -66,7 +66,6 @@ public class Replication implements WALA
       LogFactory.getLog(Replication.class);
   private boolean replication;
   private ReplicationSourceManager replicationManager;
-  private final AtomicBoolean replicating = new AtomicBoolean(true);
   private ReplicationZookeeper zkHelper;
   private ReplicationQueues replicationQueues;
   private Configuration conf;
@@ -108,17 +107,16 @@ public class Replication implements WALA
         .build());
     if (replication) {
       try {
-        this.zkHelper = new ReplicationZookeeper(server, this.replicating);
+        this.zkHelper = new ReplicationZookeeper(server);
         this.replicationQueues =
             new ReplicationQueuesZKImpl(server.getZooKeeper(), this.conf, this.server);
         this.replicationQueues.init(this.server.getServerName().toString());
       } catch (KeeperException ke) {
-        throw new IOException("Failed replication handler create " +
-           "(replicating=" + this.replicating, ke);
+        throw new IOException("Failed replication handler create", ke);
       }
       this.replicationManager =
-          new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs,
-              this.replicating, logDir, oldLogDir);
+          new ReplicationSourceManager(zkHelper, replicationQueues, conf, this.server, fs, logDir,
+              oldLogDir);
       this.statsThreadPeriod =
           this.conf.getInt("replication.stats.thread.period.seconds", 5 * 60);
       LOG.debug("ReplicationStatisticsThread " + this.statsThreadPeriod);

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSink.java Tue Jul  9 19:59:39 2013
@@ -112,9 +112,8 @@ public class ReplicationSink {
    }
 
   /**
-   * Replicate this array of entries directly into the local cluster using the native client.
-   * Only operates against raw protobuf type saving on a convertion from pb to pojo.
-   *
+   * Replicate this array of entries directly into the local cluster using the native client. Only
+   * operates against raw protobuf type saving on a conversion from pb to pojo.
    * @param entries
    * @param cells
    * @throws IOException

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Tue Jul  9 19:59:39 2013
@@ -93,8 +93,6 @@ public class ReplicationSource extends T
   // ratio of region servers to chose from a slave cluster
   private float ratio;
   private Random random;
-  // should we replicate or not?
-  private AtomicBoolean replicating;
   private ReplicationQueueInfo replicationQueueInfo;
   // id of the peer cluster this source replicates to
   private String peerId;
@@ -149,7 +147,6 @@ public class ReplicationSource extends T
    * @param fs file system to use
    * @param manager replication manager to ping to
    * @param stopper     the atomic boolean to use to stop the regionserver
-   * @param replicating the atomic boolean that starts/stops replication
    * @param peerClusterZnode the name of our znode
    * @throws IOException
    */
@@ -157,7 +154,6 @@ public class ReplicationSource extends T
                    final FileSystem fs,
                    final ReplicationSourceManager manager,
                    final Stoppable stopper,
-                   final AtomicBoolean replicating,
                    final String peerClusterZnode)
       throws IOException {
     this.stopper = stopper;
@@ -185,7 +181,6 @@ public class ReplicationSource extends T
     this.ratio = this.conf.getFloat("replication.source.ratio", 0.1f);
     this.currentPeers = new ArrayList<ServerName>();
     this.random = new Random();
-    this.replicating = replicating;
     this.manager = manager;
     this.sleepForRetries =
         this.conf.getLong("replication.source.sleepforretries", 1000);
@@ -417,9 +412,8 @@ public class ReplicationSource extends T
         removeNonReplicableEdits(entry);
         // Don't replicate catalog entries, if the WALEdit wasn't
         // containing anything to replicate and if we're currently not set to replicate
-        if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) ||
-            Bytes.equals(logKey.getTablename(), HConstants.META_TABLE_NAME)) &&
-            edit.size() != 0 && replicating.get()) {
+        if (!(Bytes.equals(logKey.getTablename(), HConstants.ROOT_TABLE_NAME) || Bytes.equals(
+          logKey.getTablename(), HConstants.META_TABLE_NAME)) && edit.size() != 0) {
           // Only set the clusterId if is a local key.
           // This ensures that the originator sets the cluster id
           // and all replicas retain the initial cluster id.
@@ -714,8 +708,7 @@ public class ReplicationSource extends T
    * @return true if the peer is enabled, otherwise false
    */
   protected boolean isPeerEnabled() {
-    return this.replicating.get() &&
-        this.zkHelper.getPeerEnabled(this.peerId);
+    return this.zkHelper.getPeerEnabled(this.peerId);
   }
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceInterface.java Tue Jul  9 19:59:39 2013
@@ -19,7 +19,6 @@
 package org.apache.hadoop.hbase.replication.regionserver;
 
 import java.io.IOException;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
@@ -39,7 +38,6 @@ public interface ReplicationSourceInterf
    * @param fs the file system to use
    * @param manager the manager to use
    * @param stopper the stopper object for this region server
-   * @param replicating the status of the replication on this cluster
    * @param peerClusterId the id of the peer cluster
    * @throws IOException
    */
@@ -47,7 +45,6 @@ public interface ReplicationSourceInterf
                    final FileSystem fs,
                    final ReplicationSourceManager manager,
                    final Stoppable stopper,
-                   final AtomicBoolean replicating,
                    final String peerClusterId) throws IOException;
 
   /**

Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Tue Jul  9 19:59:39 2013
@@ -69,8 +69,6 @@ public class ReplicationSourceManager {
   private final List<ReplicationSourceInterface> sources;
   // List of all the sources we got from died RSs
   private final List<ReplicationSourceInterface> oldsources;
-  // Indicates if we are currently replicating
-  private final AtomicBoolean replicating;
   // Helper for zookeeper
   private final ReplicationZookeeper zkHelper;
   private final ReplicationQueues replicationQueues;
@@ -103,16 +101,13 @@ public class ReplicationSourceManager {
    * @param conf the configuration to use
    * @param stopper the stopper object for this region server
    * @param fs the file system to use
-   * @param replicating the status of the replication on this cluster
    * @param logDir the directory that contains all hlog directories of live RSs
    * @param oldLogDir the directory where old logs are archived
    */
   public ReplicationSourceManager(final ReplicationZookeeper zkHelper,
       final ReplicationQueues replicationQueues, final Configuration conf, final Stoppable stopper,
-      final FileSystem fs, final AtomicBoolean replicating, final Path logDir,
-      final Path oldLogDir) {
+      final FileSystem fs, final Path logDir, final Path oldLogDir) {
     this.sources = new ArrayList<ReplicationSourceInterface>();
-    this.replicating = replicating;
     this.zkHelper = zkHelper;
     this.replicationQueues = replicationQueues;
     this.stopper = stopper;
@@ -206,8 +201,7 @@ public class ReplicationSourceManager {
    * @throws IOException
    */
   public ReplicationSourceInterface addSource(String id) throws IOException {
-    ReplicationSourceInterface src =
-        getReplicationSource(this.conf, this.fs, this, stopper, replicating, id);
+    ReplicationSourceInterface src = getReplicationSource(this.conf, this.fs, this, stopper, id);
     synchronized (this.hlogsById) {
       this.sources.add(src);
       this.hlogsById.put(id, new TreeSet<String>());
@@ -260,10 +254,6 @@ public class ReplicationSourceManager {
   }
 
   void preLogRoll(Path newLog) throws IOException {
-    if (!this.replicating.get()) {
-      LOG.warn("Replication stopped, won't add new log");
-      return;
-    }
 
     synchronized (this.hlogsById) {
       String name = newLog.getName();
@@ -288,14 +278,9 @@ public class ReplicationSourceManager {
   }
 
   void postLogRoll(Path newLog) throws IOException {
-    if (!this.replicating.get()) {
-      LOG.warn("Replication stopped, won't add new log");
-      return;
-    }
-
     // This only updates the sources we own, not the recovered ones
     for (ReplicationSourceInterface source : this.sources) {
-      source.enqueueLog(newLog);    
+      source.enqueueLog(newLog);
     }
   }
 
@@ -313,7 +298,6 @@ public class ReplicationSourceManager {
    * @param fs the file system to use
    * @param manager the manager to use
    * @param stopper the stopper object for this region server
-   * @param replicating the status of the replication on this cluster
    * @param peerId the id of the peer cluster
    * @return the created source
    * @throws IOException
@@ -323,7 +307,6 @@ public class ReplicationSourceManager {
       final FileSystem fs,
       final ReplicationSourceManager manager,
       final Stoppable stopper,
-      final AtomicBoolean replicating,
       final String peerId) throws IOException {
     ReplicationSourceInterface src;
     try {
@@ -337,7 +320,7 @@ public class ReplicationSourceManager {
       src = new ReplicationSource();
 
     }
-    src.init(conf, fs, manager, stopper, replicating, peerId);
+    src.init(conf, fs, manager, stopper, peerId);
     return src;
   }
 
@@ -599,8 +582,8 @@ public class ReplicationSourceManager {
       for (Map.Entry<String, SortedSet<String>> entry : newQueues.entrySet()) {
         String peerId = entry.getKey();
         try {
-          ReplicationSourceInterface src = getReplicationSource(conf,
-              fs, ReplicationSourceManager.this, stopper, replicating, peerId);
+          ReplicationSourceInterface src =
+              getReplicationSource(conf, fs, ReplicationSourceManager.this, stopper, peerId);
           if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
             src.terminate("Recovered queue doesn't belong to any current peer");
             break;

Modified: hbase/trunk/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html (original)
+++ hbase/trunk/hbase-server/src/main/javadoc/org/apache/hadoop/hbase/replication/package.html Tue Jul  9 19:59:39 2013
@@ -132,16 +132,6 @@ Choosing peer 10.10.1.49:62020</pre>
 
 In this case it indicates that 1 region server from the slave cluster
 was chosen for replication.<br><br>
-
-Should you want to stop the replication while the clusters are running, open
-the shell on the master cluster and issue this command:
-<pre>
-hbase(main):001:0> stop_replication</pre>
-
-Replication of already queued edits will still happen after you
-issued that command but new entries won't be. To start it back, simply replace
-"false" with "true" in the command. 
-
 <p>
 
 <a name="verify">

Modified: hbase/trunk/hbase-server/src/main/ruby/hbase/replication_admin.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/ruby/hbase/replication_admin.rb?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/ruby/hbase/replication_admin.rb (original)
+++ hbase/trunk/hbase-server/src/main/ruby/hbase/replication_admin.rb Tue Jul  9 19:59:39 2013
@@ -65,17 +65,5 @@ module Hbase
     def disable_peer(id)
       @replication_admin.disablePeer(id)
     end
-
-    #----------------------------------------------------------------------------------------------
-    # Restart the replication, in an unknown state
-    def start_replication
-      @replication_admin.setReplicating(true)
-    end
-
-    #----------------------------------------------------------------------------------------------
-    # Kill switch for replication, stops all its features
-    def stop_replication
-      @replication_admin.setReplicating(false)
-    end
   end
 end

Modified: hbase/trunk/hbase-server/src/main/ruby/shell.rb
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/ruby/shell.rb?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/ruby/shell.rb (original)
+++ hbase/trunk/hbase-server/src/main/ruby/shell.rb Tue Jul  9 19:59:39 2013
@@ -297,15 +297,13 @@ Shell.load_command_group(
 Shell.load_command_group(
   'replication',
   :full_name => 'CLUSTER REPLICATION TOOLS',
-  :comment => "In order to use these tools, hbase.replication must be true. enabling/disabling is currently unsupported",
+  :comment => "In order to use these tools, hbase.replication must be true.",
   :commands => %w[
     add_peer
     remove_peer
     list_peers
     enable_peer
     disable_peer
-    start_replication
-    stop_replication
   ]
 )
 

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java Tue Jul  9 19:59:39 2013
@@ -68,8 +68,7 @@ public class TestLogsCleaner {
     conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, true);
     Replication.decorateMasterConfiguration(conf);
     Server server = new DummyServer();
-    ReplicationZookeeper zkHelper =
-        new ReplicationZookeeper(server, new AtomicBoolean(true));
+    ReplicationZookeeper zkHelper = new ReplicationZookeeper(server);
 
     Path oldLogDir = new Path(TEST_UTIL.getDataTestDir(),
         HConstants.HREGION_OLDLOGDIR_NAME);

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java Tue Jul  9 19:59:39 2013
@@ -38,10 +38,8 @@ public class ReplicationSourceDummy impl
   Path currentPath;
 
   @Override
-  public void init(Configuration conf, FileSystem fs,
-                   ReplicationSourceManager manager, Stoppable stopper,
-                   AtomicBoolean replicating, String peerClusterId)
-      throws IOException {
+  public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
+      Stoppable stopper, String peerClusterId) throws IOException {
     this.manager = manager;
     this.peerClusterId = peerClusterId;
   }

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationBase.java Tue Jul  9 19:59:39 2013
@@ -111,7 +111,6 @@ public class TestReplicationBase {
     zkw2 = new ZooKeeperWatcher(conf2, "cluster2", null, true);
 
     admin.addPeer("2", utility2.getClusterKey());
-    setIsReplication(true);
 
     LOG.info("Setup second Zk");
     CONF_WITH_LOCALFS = HBaseConfiguration.create(conf1);
@@ -134,12 +133,6 @@ public class TestReplicationBase {
     htable2 = new HTable(conf2, tableName);
   }
 
-  protected static void setIsReplication(boolean rep) throws Exception {
-    LOG.info("Set rep " + rep);
-    admin.setReplicating(rep);
-    Thread.sleep(SLEEP_TIME);
-  }
-
   /**
    * @throws java.lang.Exception
    */

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSmallTests.java Tue Jul  9 19:59:39 2013
@@ -256,75 +256,6 @@ public class TestReplicationSmallTests e
   }
 
   /**
-   * Test stopping replication, trying to insert, make sure nothing's
-   * replicated, enable it, try replicating and it should work
-   * @throws Exception
-   */
-  @Test(timeout=300000)
-  public void testStartStop() throws Exception {
-
-    // Test stopping replication
-    setIsReplication(false);
-
-    Put put = new Put(Bytes.toBytes("stop start"));
-    put.add(famName, row, row);
-    htable1.put(put);
-
-    Get get = new Get(Bytes.toBytes("stop start"));
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        break;
-      }
-      Result res = htable2.get(get);
-      if(res.size() >= 1) {
-        fail("Replication wasn't stopped");
-
-      } else {
-        LOG.info("Row not replicated, let's wait a bit more...");
-        Thread.sleep(SLEEP_TIME);
-      }
-    }
-
-    // Test restart replication
-    setIsReplication(true);
-
-    htable1.put(put);
-
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i==NB_RETRIES-1) {
-        fail("Waited too much time for put replication");
-      }
-      Result res = htable2.get(get);
-      if(res.size() == 0) {
-        LOG.info("Row not available");
-        Thread.sleep(SLEEP_TIME);
-      } else {
-        assertArrayEquals(res.value(), row);
-        break;
-      }
-    }
-
-    put = new Put(Bytes.toBytes("do not rep"));
-    put.add(noRepfamName, row, row);
-    htable1.put(put);
-
-    get = new Get(Bytes.toBytes("do not rep"));
-    for (int i = 0; i < NB_RETRIES; i++) {
-      if (i == NB_RETRIES-1) {
-        break;
-      }
-      Result res = htable2.get(get);
-      if (res.size() >= 1) {
-        fail("Not supposed to be replicated");
-      } else {
-        LOG.info("Row not replicated, let's wait a bit more...");
-        Thread.sleep(SLEEP_TIME);
-      }
-    }
-
-  }
-
-  /**
    * Test disable/enable replication, trying to insert, make sure nothing's
    * replicated, enable it, the insert should be replicated
    *

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java Tue Jul  9 19:59:39 2013
@@ -69,10 +69,6 @@ public class TestReplicationStateZKImpl 
     Configuration testConf = new Configuration(conf);
     testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
     ZooKeeperWatcher zkw1 = new ZooKeeperWatcher(testConf, "test1", null);
-    ReplicationStateInterface rsi = new ReplicationStateImpl(zkw1, testConf, zkw1);
-    rsi.init();
-    rsi.setState(true);
-    rsi.close();
     String fakeRs = ZKUtil.joinZNode(zkw1.rsZNode, "hostname1.example.org:1234");
     ZKUtil.createWithParents(zkw1, fakeRs);
     ZKClusterId.setClusterId(zkw1, new ClusterId());

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java Tue Jul  9 19:59:39 2013
@@ -62,7 +62,7 @@ public class TestReplicationZookeeper {
     conf = utility.getConfiguration();
     zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
     DummyServer server = new DummyServer();
-    repZk = new ReplicationZookeeper(server, new AtomicBoolean());
+    repZk = new ReplicationZookeeper(server);
     slaveClusterKey = conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" +
       conf.get("hbase.zookeeper.property.clientPort") + ":/1";
     String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");

Modified: hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java?rev=1501503&r1=1501502&r2=1501503&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java (original)
+++ hbase/trunk/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java Tue Jul  9 19:59:39 2013
@@ -28,7 +28,6 @@ import java.util.List;
 import java.util.SortedMap;
 import java.util.SortedSet;
 import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -227,8 +226,7 @@ public class TestReplicationSourceManage
     LOG.debug("testNodeFailoverWorkerCopyQueuesFromRSUsingMulti");
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server server = new DummyServer("hostname0.example.org");
-    AtomicBoolean replicating = new AtomicBoolean(true);
-    ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
+    ReplicationZookeeper rz = new ReplicationZookeeper(server);
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
@@ -260,8 +258,6 @@ public class TestReplicationSourceManage
     populatedMap += w1.isLogZnodesMapPopulated() + w2.isLogZnodesMapPopulated()
         + w3.isLogZnodesMapPopulated();
     assertEquals(1, populatedMap);
-    // close out the resources.
-    rz.close();
     server.abort("", null);
   }
 
@@ -270,8 +266,7 @@ public class TestReplicationSourceManage
     LOG.debug("testNodeFailoverDeadServerParsing");
     conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
     final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
-    AtomicBoolean replicating = new AtomicBoolean(true);
-    ReplicationZookeeper rz = new ReplicationZookeeper(server, replicating);
+    ReplicationZookeeper rz = new ReplicationZookeeper(server);
     // populate some znodes in the peer znode
     files.add("log1");
     files.add("log2");
@@ -285,16 +280,13 @@ public class TestReplicationSourceManage
     Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
 
     // simulate three servers fail sequentially
-    ReplicationZookeeper rz1 = new ReplicationZookeeper(s1, new AtomicBoolean(true));
+    ReplicationZookeeper rz1 = new ReplicationZookeeper(s1);
     SortedMap<String, SortedSet<String>> testMap =
         rz1.claimQueues(server.getServerName().getServerName());
-    rz1.close();
-    ReplicationZookeeper rz2 = new ReplicationZookeeper(s2, new AtomicBoolean(true));
+    ReplicationZookeeper rz2 = new ReplicationZookeeper(s2);
     testMap = rz2.claimQueues(s1.getServerName().getServerName());
-    rz2.close();
-    ReplicationZookeeper rz3 = new ReplicationZookeeper(s3, new AtomicBoolean(true));
+    ReplicationZookeeper rz3 = new ReplicationZookeeper(s3);
     testMap = rz3.claimQueues(s2.getServerName().getServerName());
-    rz3.close();
 
     ReplicationQueueInfo replicationQueueInfo = new ReplicationQueueInfo(testMap.firstKey());
     List<String> result = replicationQueueInfo.getDeadRegionServers();
@@ -303,9 +295,7 @@ public class TestReplicationSourceManage
     assertTrue(result.contains(server.getServerName().getServerName()));
     assertTrue(result.contains(s1.getServerName().getServerName()));
     assertTrue(result.contains(s2.getServerName().getServerName()));
-    
-    // close out the resources.
-    rz.close();
+
     server.abort("", null);
   }
   
@@ -319,14 +309,13 @@ public class TestReplicationSourceManage
     public DummyNodeFailoverWorker(String znode, Server s) throws Exception {
       this.deadRsZnode = znode;
       this.server = s;
-      rz = new ReplicationZookeeper(server, new AtomicBoolean(true));
+      rz = new ReplicationZookeeper(server);
     }
 
     @Override
     public void run() {
       try {
         logZnodesMap = rz.claimQueues(deadRsZnode);
-        rz.close();
         server.abort("Done with testing", null);
       } catch (Exception e) {
         LOG.error("Got exception while running NodeFailoverWorker", e);