You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by st...@apache.org on 2013/01/18 18:02:28 UTC
svn commit: r1435250 - in
/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase:
client/replication/ replication/
Author: stack
Date: Fri Jan 18 17:02:28 2013
New Revision: 1435250
URL: http://svn.apache.org/viewvc?rev=1435250&view=rev
Log:
HBASE-7565 [replication] Create an interface for the replication state node
Added:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
Modified:
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java?rev=1435250&r1=1435249&r2=1435250&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java Fri Jan 18 17:02:28 2013
@@ -185,7 +185,7 @@ public class ReplicationAdmin implements
boolean prev = true;
try {
prev = getReplicating();
- this.replicationZk.setReplicating(newState);
+ this.replicationZk.setReplication(newState);
} catch (KeeperException e) {
throw new IOException("Unable to set the replication state", e);
}
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1435250&r1=1435249&r2=1435250&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Fri Jan 18 17:02:28 2013
@@ -90,7 +90,7 @@ public class ReplicationPeer implements
}
private void readPeerStateZnode() throws DeserializationException {
- this.peerEnabled.set(ReplicationZookeeper.isPeerEnabled(this.peerStateTracker.getData(false)));
+ this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
}
/**
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java?rev=1435250&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java Fri Jan 18 17:02:28 2013
@@ -0,0 +1,167 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * ReplicationStateImpl is responsible for maintaining the replication state
+ * znode.
+ */
+public class ReplicationStateImpl implements ReplicationStateInterface {
+
+ private ReplicationStateTracker stateTracker;
+ private final String stateZnode;
+ private final ZooKeeperWatcher zookeeper;
+ private final Abortable abortable;
+ private final AtomicBoolean replicating;
+
+ private static final Log LOG = LogFactory.getLog(ReplicationStateImpl.class);
+
+ public ReplicationStateImpl(final ZooKeeperWatcher zk, final String stateZnode,
+ final Abortable abortable, final AtomicBoolean replicating) {
+ this.zookeeper = zk;
+ this.stateZnode = stateZnode;
+ this.abortable = abortable;
+ this.replicating = replicating;
+
+ // Set a tracker on replicationStateNode
+ this.stateTracker = new ReplicationStateTracker(this.zookeeper, this.stateZnode,
+ this.abortable);
+ stateTracker.start();
+ readReplicationStateZnode();
+ }
+
+ public boolean getState() throws KeeperException {
+ return getReplication();
+ }
+
+ public void setState(boolean newState) throws KeeperException {
+ setReplicating(newState);
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (stateTracker != null) stateTracker.stop();
+ }
+
+ /**
+ * @param bytes
+ * @return True if the passed in <code>bytes</code> are those of a pb
+ * serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ private boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+ ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
+ .newBuilder();
+ ZooKeeperProtos.ReplicationState state;
+ try {
+ state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ return state.getState();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * Set the new replication state for this cluster
+ * @param newState
+ */
+ private void setReplicating(boolean newState) throws KeeperException {
+ ZKUtil.createWithParents(this.zookeeper, this.stateZnode);
+ byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
+ : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
+ ZKUtil.setData(this.zookeeper, this.stateZnode, stateBytes);
+ }
+
+ /**
+ * Get the replication status of this cluster. If the state znode doesn't
+ * exist it will also create it and set it true.
+ * @return returns true when it's enabled, else false
+ * @throws KeeperException
+ */
+ private boolean getReplication() throws KeeperException {
+ byte[] data = this.stateTracker.getData(false);
+ if (data == null || data.length == 0) {
+ setReplicating(true);
+ return true;
+ }
+ try {
+ return isStateEnabled(data);
+ } catch (DeserializationException e) {
+ throw ZKUtil.convert(e);
+ }
+ }
+
+ /**
+ * This reads the state znode for replication and sets the atomic boolean
+ */
+ private void readReplicationStateZnode() {
+ try {
+ this.replicating.set(getReplication());
+ LOG.info("Replication is now " + (this.replicating.get() ? "started" : "stopped"));
+ } catch (KeeperException e) {
+ this.abortable.abort("Failed getting data on from " + this.stateZnode, e);
+ }
+ }
+
+ /**
+ * Tracker for status of the replication
+ */
+ private class ReplicationStateTracker extends ZooKeeperNodeTracker {
+ public ReplicationStateTracker(ZooKeeperWatcher watcher, String stateZnode, Abortable abortable) {
+ super(watcher, stateZnode, abortable);
+ }
+
+ @Override
+ public synchronized void nodeDataChanged(String path) {
+ if (path.equals(node)) {
+ super.nodeDataChanged(path);
+ readReplicationStateZnode();
+ }
+ }
+ }
+}
\ No newline at end of file
Added: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java?rev=1435250&view=auto
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (added)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java Fri Jan 18 17:02:28 2013
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.Closeable;
+
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for getting and setting the replication state of a
+ * cluster. This state is used to indicate whether replication is enabled or
+ * disabled on a cluster.
+ */
+public interface ReplicationStateInterface extends Closeable {
+
+ /**
+ * Get the current state of replication (i.e. ENABLED or DISABLED).
+ *
+ * @return true if replication is enabled, false otherwise
+ * @throws KeeperException
+ */
+ public boolean getState() throws KeeperException;
+
+ /**
+ * Set the state of replication.
+ *
+ * @param newState
+ * @throws KeeperException
+ */
+ public void setState(boolean newState) throws KeeperException;
+}
\ No newline at end of file
Modified: hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1435250&r1=1435249&r2=1435250&view=diff
==============================================================================
--- hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/trunk/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Fri Jan 18 17:02:28 2013
@@ -43,12 +43,10 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.KeeperException.ConnectionLossException;
@@ -87,7 +85,7 @@ import com.google.protobuf.InvalidProtoc
* </pre>
*/
@InterfaceAudience.Private
-public class ReplicationZookeeper implements Closeable{
+public class ReplicationZookeeper implements Closeable {
private static final Log LOG =
LogFactory.getLog(ReplicationZookeeper.class);
// Name of znode we use to lock when failover
@@ -111,24 +109,24 @@ public class ReplicationZookeeper implem
// peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
private String peerStateNodeName;
private final Configuration conf;
- // Is this cluster replicating at the moment?
- private AtomicBoolean replicating;
// The key to our own cluster
private String ourClusterKey;
// Abortable
private Abortable abortable;
- private ReplicationStatusTracker statusTracker;
+ private final ReplicationStateInterface replicationState;
/**
* ZNode content if enabled state.
*/
// Public so it can be seen by test code.
- public static final byte[] ENABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+ public static final byte[] ENABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
/**
* ZNode content if disabled state.
*/
- static final byte[] DISABLED_ZNODE_BYTES = toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+ static final byte[] DISABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
/**
* Constructor used by clients of replication (like master and HBase clients)
@@ -140,8 +138,9 @@ public class ReplicationZookeeper implem
final ZooKeeperWatcher zk) throws KeeperException {
this.conf = conf;
this.zookeeper = zk;
- this.replicating = new AtomicBoolean();
setZNodes(abortable);
+ this.replicationState =
+ new ReplicationStateImpl(this.zookeeper, getRepStateNode(), abortable, new AtomicBoolean());
}
/**
@@ -157,9 +156,10 @@ public class ReplicationZookeeper implem
this.abortable = server;
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
- this.replicating = replicating;
setZNodes(server);
+ this.replicationState =
+ new ReplicationStateImpl(this.zookeeper, getRepStateNode(), server, replicating);
this.peerClusters = new HashMap<String, ReplicationPeer>();
ZKUtil.createWithParents(this.zookeeper,
ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
@@ -180,11 +180,6 @@ public class ReplicationZookeeper implem
ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
-
- // Set a tracker on replicationStateNodeNode
- this.statusTracker = new ReplicationStatusTracker(this.zookeeper, abortable);
- statusTracker.start();
- readReplicationStateZnode();
}
private void connectExistingPeers() throws IOException, KeeperException {
@@ -366,18 +361,6 @@ public class ReplicationZookeeper implem
}
/**
- * Set the new replication state for this cluster
- * @param newState
- */
- public void setReplicating(boolean newState) throws KeeperException {
- ZKUtil.createWithParents(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
- byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
- ZKUtil.setData(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName), stateBytes);
- }
-
- /**
* Remove the peer from zookeeper. which will trigger the watchers on every
* region server and close their sources
* @param id
@@ -641,40 +624,27 @@ public class ReplicationZookeeper implem
return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
}
- /**
- * This reads the state znode for replication and sets the atomic boolean
- */
- private void readReplicationStateZnode() {
- try {
- this.replicating.set(getReplication());
- LOG.info("Replication is now " + (this.replicating.get()?
- "started" : "stopped"));
- } catch (KeeperException e) {
- this.abortable.abort("Failed getting data on from " + getRepStateNode(), e);
- }
+ private String getRepStateNode() {
+ return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
}
/**
- * Get the replication status of this cluster. If the state znode doesn't
- * exist it will also create it and set it true.
+ * Get the replication status of this cluster. If the state znode doesn't exist it will also
+ * create it and set it true.
* @return returns true when it's enabled, else false
* @throws KeeperException
*/
public boolean getReplication() throws KeeperException {
- byte [] data = this.statusTracker.getData(false);
- if (data == null || data.length == 0) {
- setReplicating(true);
- return true;
- }
- try {
- return isPeerEnabled(data);
- } catch (DeserializationException e) {
- throw ZKUtil.convert(e);
- }
+ return this.replicationState.getState();
}
- private String getRepStateNode() {
- return ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName);
+ /**
+ * Set the new replication state for this cluster
+ * @param newState
+ * @throws KeeperException
+ */
+ public void setReplication(boolean newState) throws KeeperException {
+ this.replicationState.setState(newState);
}
/**
@@ -1055,8 +1025,7 @@ public class ReplicationZookeeper implem
@Override
public void close() throws IOException {
- if (statusTracker != null)
- statusTracker.stop();
+ if (replicationState != null) replicationState.close();
}
/**
@@ -1086,26 +1055,8 @@ public class ReplicationZookeeper implem
* serialized ENABLED state.
* @throws DeserializationException
*/
- static boolean isPeerEnabled(final byte[] bytes) throws DeserializationException {
+ static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
}
-
- /**
- * Tracker for status of the replication
- */
- public class ReplicationStatusTracker extends ZooKeeperNodeTracker {
- public ReplicationStatusTracker(ZooKeeperWatcher watcher,
- Abortable abortable) {
- super(watcher, getRepStateNode(), abortable);
- }
-
- @Override
- public synchronized void nodeDataChanged(String path) {
- if (path.equals(node)) {
- super.nodeDataChanged(path);
- readReplicationStateZnode();
- }
- }
- }
}