You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2010/12/14 05:21:03 UTC
svn commit: r1048933 - in /hbase/branches/0.90: ./
src/main/java/org/apache/hadoop/hbase/client/replication/
src/main/java/org/apache/hadoop/hbase/mapreduce/replication/
src/main/java/org/apache/hadoop/hbase/master/
src/main/java/org/apache/hadoop/hbas...
Author: jdcryans
Date: Tue Dec 14 04:21:02 2010
New Revision: 1048933
URL: http://svn.apache.org/viewvc?rev=1048933&view=rev
Log:
HBASE-3351 ReplicationZookeeper goes to ZK every time a znode is modified
HBASE-3326 Replication state's znode should be created else it
defaults to false
Modified:
hbase/branches/0.90/CHANGES.txt
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
Modified: hbase/branches/0.90/CHANGES.txt
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/CHANGES.txt?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/CHANGES.txt (original)
+++ hbase/branches/0.90/CHANGES.txt Tue Dec 14 04:21:02 2010
@@ -739,6 +739,9 @@ Release 0.90.0 - Unreleased
HBASE-3321 Replication.join shouldn't clear the logs znode
HBASE-3352 enabling a non-existent table from shell prints no error
HBASE-3353 table.jsp doesn't handle entries in META without server info
+ HBASE-3351 ReplicationZookeeper goes to ZK every time a znode is modified
+ HBASE-3326 Replication state's znode should be created else it
+ defaults to false
IMPROVEMENTS
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Tue Dec 14 04:21:02 2010
@@ -24,6 +24,7 @@ import java.io.IOException;
import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -65,6 +66,7 @@ import org.apache.zookeeper.KeeperExcept
public class ReplicationAdmin {
private final ReplicationZookeeper replicationZk;
+ private final HConnection connection;
/**
* Constructor that creates a connection to the local ZooKeeper ensemble.
@@ -77,10 +79,10 @@ public class ReplicationAdmin {
throw new RuntimeException("hbase.replication isn't true, please " +
"enable it in order to use replication");
}
- ZooKeeperWatcher zkw = HConnectionManager.getConnection(conf).
- getZooKeeperWatcher();
+ this.connection = HConnectionManager.getConnection(conf);
+ ZooKeeperWatcher zkw = this.connection.getZooKeeperWatcher();
try {
- this.replicationZk = new ReplicationZookeeper(conf, zkw);
+ this.replicationZk = new ReplicationZookeeper(this.connection, conf, zkw);
} catch (KeeperException e) {
throw new IOException("Unable setup the ZooKeeper connection", e);
}
@@ -150,8 +152,13 @@ public class ReplicationAdmin {
* @return the previous state
*/
public boolean setReplicating(boolean newState) throws IOException {
- boolean prev = getReplicating();
- this.replicationZk.setReplicating(newState);
+ boolean prev = true;
+ try {
+ prev = getReplicating();
+ this.replicationZk.setReplicating(newState);
+ } catch (KeeperException e) {
+ throw new IOException("Unable to set the replication state", e);
+ }
return prev;
}
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Tue Dec 14 04:21:02 2010
@@ -27,12 +27,14 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.replication.ReplicationAdmin;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
@@ -106,9 +108,9 @@ public class VerifyReplication {
endTime == 0 ? HConstants.LATEST_TIMESTAMP : endTime);
}
try {
- ReplicationZookeeper zk = new ReplicationZookeeper(conf,
- HConnectionManager.getConnection(conf).
- getZooKeeperWatcher());
+ HConnection conn = HConnectionManager.getConnection(conf);
+ ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+ conn.getZooKeeperWatcher());
ReplicationPeer peer = zk.getPeer(conf.get(NAME+".peerId"));
HTable replicatedTable = new HTable(peer.getConfiguration(),
conf.get(NAME+".tableName"));
@@ -150,8 +152,9 @@ public class VerifyReplication {
throw new IOException("Replication needs to be enabled to verify it.");
}
try {
- ReplicationZookeeper zk = new ReplicationZookeeper(conf,
- HConnectionManager.getConnection(conf).getZooKeeperWatcher());
+ HConnection conn = HConnectionManager.getConnection(conf);
+ ReplicationZookeeper zk = new ReplicationZookeeper(conn, conf,
+ conn.getZooKeeperWatcher());
// Just verifying it we can connect
ReplicationPeer peer = zk.getPeer(peerId);
if (peer == null) {
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/master/LogCleaner.java Tue Dec 14 04:21:02 2010
@@ -130,6 +130,11 @@ public class LogCleaner extends Chore {
Path filePath = file.getPath();
if (HLog.validateHLogFilename(filePath.getName())) {
for (LogCleanerDelegate logCleaner : logCleanersChain) {
+ if (logCleaner.isStopped()) {
+ LOG.warn("A log cleaner is stopped, won't delete any log.");
+ return;
+ }
+
if (!logCleaner.isLogDeletable(filePath) ) {
// this log is not deletable, continue to process next log file
continue FILE;
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Tue Dec 14 04:21:02 2010
@@ -100,6 +100,7 @@ public class ReplicationZookeeper {
private String ourClusterKey;
// Abortable
private Abortable abortable;
+ private ReplicationStatusTracker statusTracker;
/**
* Constructor used by clients of replication (like master and HBase clients)
@@ -107,12 +108,14 @@ public class ReplicationZookeeper {
* @param zk zk connection to use
* @throws IOException
*/
- public ReplicationZookeeper(final Configuration conf, final ZooKeeperWatcher zk)
+ public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
+ final ZooKeeperWatcher zk)
throws KeeperException {
this.conf = conf;
this.zookeeper = zk;
- setZNodes();
+ this.replicating = new AtomicBoolean();
+ setZNodes(abortable);
}
/**
@@ -128,23 +131,18 @@ public class ReplicationZookeeper {
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
- setZNodes();
+ this.replicating = replicating;
+ setZNodes(server);
this.peerClusters = new HashMap<String, ReplicationPeer>();
- this.replicating = replicating;
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
- readReplicationStateZnode();
this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName());
ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
- // Set a tracker on replicationStateNodeNode
- ReplicationStatusTracker tracker =
- new ReplicationStatusTracker(this.zookeeper, server);
- tracker.start();
connectExistingPeers();
}
- private void setZNodes() throws KeeperException {
+ private void setZNodes(Abortable abortable) throws KeeperException {
String replicationZNodeName =
conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName =
@@ -170,6 +168,11 @@ public class ReplicationZookeeper {
String idResult = Bytes.toString(data);
this.clusterId = idResult == null?
Byte.toString(HConstants.DEFAULT_CLUSTER_ID): idResult;
+ // Set a tracker on replicationStateNodeNode
+ this.statusTracker =
+ new ReplicationStatusTracker(this.zookeeper, abortable);
+ statusTracker.start();
+ readReplicationStateZnode();
}
private void connectExistingPeers() throws IOException, KeeperException {
@@ -292,16 +295,12 @@ public class ReplicationZookeeper {
* Set the new replication state for this cluster
* @param newState
*/
- public void setReplicating(boolean newState) throws IOException {
- try {
- ZKUtil.createWithParents(this.zookeeper,
+ public void setReplicating(boolean newState) throws KeeperException {
+ ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
- ZKUtil.setData(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
- Bytes.toBytes(Boolean.toString(newState)));
- } catch (KeeperException e) {
- throw new IOException("Unable to set the replication state", e);
- }
+ ZKUtil.setData(this.zookeeper,
+ ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName),
+ Bytes.toBytes(Boolean.toString(newState)));
}
/**
@@ -368,8 +367,18 @@ public class ReplicationZookeeper {
}
}
+ /**
+ * Get the replication status of this cluster. If the state znode doesn't
+ * exist it will also create it and set it true.
+ * @return returns true when it's enabled, else false
+ * @throws KeeperException
+ */
public boolean getReplication() throws KeeperException {
- byte [] data = ZKUtil.getDataAndWatch(this.zookeeper, getRepStateNode());
+ byte [] data = this.statusTracker.getData();
+ if (data == null || data.length == 0) {
+ setReplicating(true);
+ return true;
+ }
return Boolean.parseBoolean(Bytes.toString(data));
}
@@ -681,8 +690,10 @@ public class ReplicationZookeeper {
@Override
public synchronized void nodeDataChanged(String path) {
- super.nodeDataChanged(path);
- readReplicationStateZnode();
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ readReplicationStateZnode();
+ }
}
}
}
\ No newline at end of file
Modified: hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1048933&r1=1048932&r2=1048933&view=diff
==============================================================================
--- hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.90/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Tue Dec 14 04:21:02 2010
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.master.LogCleanerDelegate;
@@ -39,7 +40,7 @@ import java.util.Set;
* Implementation of a log cleaner that checks if a log is still scheduled for
* replication before deleting it when its TTL is over.
*/
-public class ReplicationLogCleaner implements LogCleanerDelegate {
+public class ReplicationLogCleaner implements LogCleanerDelegate, Abortable {
private static final Log LOG = LogFactory.getLog(ReplicationLogCleaner.class);
private Configuration conf;
private ReplicationZookeeper zkHelper;
@@ -53,6 +54,16 @@ public class ReplicationLogCleaner imple
@Override
public boolean isLogDeletable(Path filePath) {
+
+ try {
+ if (!zkHelper.getReplication()) {
+ return false;
+ }
+ } catch (KeeperException e) {
+ abort("Cannot get the state of replication", e);
+ return false;
+ }
+
// all members of this class are null if replication is disabled, and we
// return true since false would render the LogsCleaner useless
if (this.conf == null) {
@@ -121,7 +132,7 @@ public class ReplicationLogCleaner imple
try {
ZooKeeperWatcher zkw =
new ZooKeeperWatcher(this.conf, "replicationLogCleaner", null);
- this.zkHelper = new ReplicationZookeeper(this.conf, zkw);
+ this.zkHelper = new ReplicationZookeeper(this, this.conf, zkw);
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
@@ -150,4 +161,10 @@ public class ReplicationLogCleaner imple
public boolean isStopped() {
return this.stopped;
}
+
+ @Override
+ public void abort(String why, Throwable e) {
+ LOG.warn("Aborting ReplicationLogCleaner because " + why, e);
+ stop(why);
+ }
}
\ No newline at end of file