You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/12/14 05:21:03 UTC

svn commit: r1048933 - in /hbase/branches/0.90: ./ src/main/java/org/apache/hadoop/hbase/client/replication/ src/main/java/org/apache/hadoop/hbase/mapreduce/replication/ src/main/java/org/apache/hadoop/hbase/master/ src/main/java/org/apache/hadoop/hbas...

Author: jdcryans
Date: Tue Dec 14 04:21:02 2010
New Revision: 1048933

URL: http://svn.apache.org/viewvc?rev=1048933&view=rev
Log:
HBASE-3351  ReplicationZookeeper goes to ZK every time a znode is modified
HBASE-3326  Replication state's znode should be created else it 
            defaults to false

Modified:
    hbase/branches/0.90/CHANGES.txt
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
    hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java

Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Tue Dec 14 04:21:02 2010
@@ -739,6 +739,9 @@ Release 0.90.0 - Unreleased
    HBASE-3321  Replication.join shouldn't clear the logs znode
    HBASE-3352  enabling a non-existent table from shell prints no error
    HBASE-3353  table.jsp doesn't handle entries in META without server info
+   HBASE-3351  ReplicationZookeeper goes to ZK every time a znode is modified
+   HBASE-3326  Replication state's znode should be created else it
+               defaults to false
 
 
   IMPROVEMENTS

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Tue Dec 14 04:21:02 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
 import org.apache.commons.lang.NotImplementedException;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -65,6 +66,7 @@ import org.apache.zookeeper.KeeperExcept
 public class ReplicationAdmin {
 
   private final ReplicationZookeeper replicationZk;
+  private final HConnection connection;
 
   /**
    * Constructor that creates a connection to the local ZooKeeper ensemble.
@@ -77,10 +79,10 @@ public class ReplicationAdmin {
       throw new RuntimeException("hbase.replication isn't true, please " +
           "enable it in order to use replication");
     }
-    ZooKeeperWatcher zkw = HConnectionManager.getConnection(conf).
-        getZooKeeperWatcher();
+    this.connection = HConnectionManager.getConnection(conf);
+    ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
     try {
-      this.replicationZk = new ReplicationZookeeper(conf, zkw);
+      this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw);
     } catch (KeeperException e) {
       throw new IOException("Unable setup the ZooKeeper connection", e);
     }
@@ -150,8 +152,13 @@ public class ReplicationAdmin {
    * @return the previous state
    */
   public boolean setReplicating(boolean newState) throws IOException {
-    boolean prev = getReplicating();
-    this.replicationZk.setReplicating(newState);
+    boolean prev = true;
+    try {
+      prev = getReplicating();
+      this.replicationZk.setReplicating(newState);
+    } catch (KeeperException e) {
+      throw new IOException("Unable to set the replication state", e);
+    }
     return prev;
   }
 

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Tue Dec 14 04:21:02 2010
@@ -27,12 +27,14 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnection;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.client.HTable;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.client.ResultScanner;
 import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
 import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -106,9 +108,9 @@ public class VerifyReplication {
               endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
         }
         try {
-          ReplicationZookeeper zk = new ReplicationZookeeper(conf,
-              HConnectionManager.getConnection(conf).
-          getZooKeeperWatcher());
+          HConnection conn = HConnectionManager.getConnection(conf);
+          ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+              conn.getZooKeeperWatcher());
           ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
           HTable replicatedTable = new HTable(peer.getConfiguration(),
               conf.get(NAME+".tableName"));
@@ -150,8 +152,9 @@ public class VerifyReplication {
       throw new IOException("Replication needs to be enabled to verify it.");
     }
     try {
-      ReplicationZookeeper zk = new ReplicationZookeeper(conf,
-          HConnectionManager.getConnection(conf).getZooKeeperWatcher());
+      HConnection conn = HConnectionManager.getConnection(conf);
+      ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+          conn.getZooKeeperWatcher());
       // Just verifying it we can connect
       ReplicationPeer peer = zk.getPeer(peerId);
       if (peer == null) {

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java Tue Dec 14 04:21:02 2010
@@ -130,6 +130,11 @@ public class LogCleaner extends Chore {
         Path filePath = file.getPath();
         if (HLog.validateHLogFilename(filePath.getName())) {
           for (LogCleanerDelegate logCleaner : logCleanersChain) {
+            if (logCleaner.isStopped()) {
+              LOG.warn("A log cleaner is stopped, won't delete any log.");
+              return;
+            }
+
             if (!logCleaner.isLogDeletable(filePath) ) {
               // this log is not deletable, continue to process next log file
               continue FILE;

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Tue Dec 14 04:21:02 2010
@@ -100,6 +100,7 @@ public class ReplicationZookeeper {
   private String ourClusterKey;
   // Abortable
   private Abortable abortable;
+  private ReplicationStatusTracker statusTracker;
 
   /**
    * Constructor used by clients of replication (like master and HBase clients)
@@ -107,12 +108,14 @@ public class ReplicationZookeeper {
    * @param zk    zk connection to use
    * @throws IOException
    */
-  public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk)
+  public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
+                              final ZooKeeperWatcher zk)
     throws KeeperException {
 
     this.conf = conf;
     this.zookeeper = zk;
-    setZNodes();
+    this.replicating = new AtomicBoolean();
+    setZNodes(abortable);
   }
 
   /**
@@ -128,23 +131,18 @@ public class ReplicationZookeeper {
     this.abortable = server;
     this.zookeeper = server.getZooKeeper();
     this.conf = server.getConfiguration();
-    setZNodes();
+    this.replicating = replicating;
+    setZNodes(server);
 
     this.peerClusters = new HashMap<String, ReplicationPeer>();
-    this.replicating = replicating;
     ZKUtil.createWithParents(this.zookeeper,
         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
-    readReplicationStateZnode();
     this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
     ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
-    // Set a tracker on replicationStateNodeNode
-    ReplicationStatusTracker tracker =
-        new ReplicationStatusTracker(this.zookeeper, server);
-    tracker.start();
     connectExistingPeers();
   }
 
-  private void setZNodes() throws KeeperException {
+  private void setZNodes(Abortable abortable) throws KeeperException {
     String replicationZNodeName =
         conf.get("zookeeper.znode.replication", "replication");
     String peersZNodeName =
@@ -170,6 +168,11 @@ public class ReplicationZookeeper {
     String idResult = Bytes.toString(data);
     this.clusterId = idResult == null?
       Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
+    // Set a tracker on replicationStateNodeNode
+    this.statusTracker =
+        new ReplicationStatusTracker(this.zookeeper, abortable);
+    statusTracker.start();
+    readReplicationStateZnode();
   }
 
   private void connectExistingPeers() throws IOException, KeeperException {
@@ -292,16 +295,12 @@ public class ReplicationZookeeper {
    * Set the new replication state for this cluster
    * @param newState
    */
-  public void setReplicating(boolean newState) throws IOException {
-    try {
-      ZKUtil.createWithParents(this.zookeeper,
+  public void setReplicating(boolean newState) throws KeeperException {
+    ZKUtil.createWithParents(this.zookeeper,
         ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
-      ZKUtil.setData(this.zookeeper,
-          ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
-          Bytes.toBytes(Boolean.toString(newState)));
-    } catch (KeeperException e) {
-      throw new IOException("Unable to set the replication state", e);
-    }
+    ZKUtil.setData(this.zookeeper,
+        ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
+        Bytes.toBytes(Boolean.toString(newState)));
   }
 
   /**
@@ -368,8 +367,18 @@ public class ReplicationZookeeper {
     }
   }
 
+  /**
+   * Get the replication status of this cluster. If the state znode doesn't
+   * exist it will also create it and set it true.
+   * @return returns true when it's enabled, else false
+   * @throws KeeperException
+   */
   public boolean getReplication() throws KeeperException {
-    byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
+    byte [] data = this.statusTracker.getData();
+    if (data == null || data.length == 0) {
+      setReplicating(true);
+      return true;
+    }
     return Boolean.parseBoolean(Bytes.toString(data));
   }
 
@@ -681,8 +690,10 @@ public class ReplicationZookeeper {
 
     @Override
     public synchronized void nodeDataChanged(String path) {
-      super.nodeDataChanged(path);
-      readReplicationStateZnode();
+      if (path.equals(node)) {
+        super.nodeDataChanged(path);
+        readReplicationStateZnode();
+      }
     }
   }
 }
\ No newline at end of file

Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Tue Dec 14 04:21:02 2010
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.client.HConnectionManager;
 import org.apache.hadoop.hbase.master.LogCleanerDelegate;
@@ -39,7 +40,7 @@ import java.util.Set;
  * Implementation of a log cleaner that checks if a log is still scheduled for
  * replication before deleting it when its TTL is over.
  */
-public class ReplicationLogCleaner implements LogCleanerDelegate {
+public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
   private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
   private Configuration conf;
   private ReplicationZookeeper zkHelper;
@@ -53,6 +54,16 @@ public class ReplicationLogCleaner imple
 
   @Override
   public boolean isLogDeletable(Path filePath) {
+
+    try {
+      if (!zkHelper.getReplication()) {
+        return false;
+      }
+    } catch (KeeperException e) {
+      abort("Cannot get the state of replication", e);
+      return false;
+    }
+
     // all members of this class are null if replication is disabled, and we
     // return true since false would render the LogsCleaner useless
     if (this.conf == null) {
@@ -121,7 +132,7 @@ public class ReplicationLogCleaner imple
     try {
       ZooKeeperWatcher zkw =
           new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null);
-      this.zkHelper = new ReplicationZookeeper(this.conf, zkw);
+      this.zkHelper = new ReplicationZookeeper(this, this.conf, zkw);
     } catch (KeeperException e) {
       LOG.error("Error while configuring " + this.getClass().getName(), e);
     } catch (IOException e) {
@@ -150,4 +161,10 @@ public class ReplicationLogCleaner imple
   public boolean isStopped() {
     return this.stopped;
   }
+
+  @Override
+  public void abort(String why, Throwable e) {
+    LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
+    stop(why);
+  }
 }
\ No newline at end of file