You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by jd...@apache.org on 2013/05/18 01:57:29 UTC
svn commit: r1484030 [1/2] - in /hbase/branches/0.95:
hbase-client/src/main/java/org/apache/hadoop/hbase/replication/
hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/
hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/ hbase-serv...
Author: jdcryans
Date: Fri May 17 23:57:28 2013
New Revision: 1484030
URL: http://svn.apache.org/r1484030
Log:
HBASE-7567 [replication] Create an interface for replication peers (Chris Trezzo via JD)
Added:
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
Modified:
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationZookeeper.java
hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java Fri May 17 23:57:28 2013
@@ -25,10 +25,15 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.NodeExistsException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
import java.io.Closeable;
import java.io.IOException;
@@ -79,7 +84,7 @@ public class ReplicationPeer implements
*/
public void startStateTracker(ZooKeeperWatcher zookeeper, String peerStateNode)
throws KeeperException {
- ReplicationZookeeper.ensurePeerEnabled(zookeeper, peerStateNode);
+ ensurePeerEnabled(zookeeper, peerStateNode);
this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
this.peerStateTracker.start();
try {
@@ -90,7 +95,7 @@ public class ReplicationPeer implements
}
private void readPeerStateZnode() throws DeserializationException {
- this.peerEnabled.set(ReplicationZookeeper.isStateEnabled(this.peerStateTracker.getData(false)));
+ this.peerEnabled.set(isStateEnabled(this.peerStateTracker.getData(false)));
}
/**
@@ -182,6 +187,57 @@ public class ReplicationPeer implements
}
/**
+ * @param bytes
+ * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
+ * @throws DeserializationException
+ */
+ private static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
+ ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
+ return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
+ }
+
+ /**
+ * @param bytes Content of a state znode.
+ * @return State parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
+ throws DeserializationException {
+ ProtobufUtil.expectPBMagicPrefix(bytes);
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationState.Builder builder =
+ ZooKeeperProtos.ReplicationState.newBuilder();
+ ZooKeeperProtos.ReplicationState state;
+ try {
+ state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ return state.getState();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ }
+
+ /**
+ * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
+ * @param zookeeper
+ * @param path Path to znode to check
+ * @return True if we created the znode.
+ * @throws NodeExistsException
+ * @throws KeeperException
+ */
+ private static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
+ throws NodeExistsException, KeeperException {
+ if (ZKUtil.checkExists(zookeeper, path) == -1) {
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
+ ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+ return true;
+ }
+ return false;
+ }
+
+ /**
* Tracker for state of this peer
*/
public class PeerStateTracker extends ZooKeeperNodeTracker {
Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java?rev=1484030&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java Fri May 17 23:57:28 2013
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.zookeeper.KeeperException;
+
+/**
+ * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
+ * clusters that data is replicated to. A peer cluster can be in three different states:
+ *
+ * 1. Not-Registered - There is no notion of the peer cluster.
+ * 2. Registered - The peer has an id and is being tracked but there is no connection.
+ * 3. Connected - There is an active connection to the remote peer.
+ *
+ * In the registered or connected state, a peer cluster can either be enabled or disabled.
+ */
+@InterfaceAudience.Private
+public interface ReplicationPeers {
+
+ /**
+ * Initialize the ReplicationPeers interface.
+ * @throws KeeperException
+ */
+ public void init() throws IOException, KeeperException;
+
+ /**
+ * Add a new remote slave cluster for replication.
+ * @param peerId a short that identifies the cluster
+ * @param clusterKey the concatenation of the slave cluster's:
+ * hbase.zookeeper.quorum:hbase.zookeeper.property.clientPort:zookeeper.znode.parent
+ */
+ public void addPeer(String peerId, String clusterKey) throws IOException;
+
+ /**
+ * Removes a remote slave cluster and stops the replication to it.
+ * @param peerId a short that identifies the cluster
+ */
+ public void removePeer(String peerId) throws IOException;
+
+ /**
+ * Restart the replication to the specified remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ public void enablePeer(String peerId) throws IOException;
+
+ /**
+ * Stop the replication to the specified remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ public void disablePeer(String peerId) throws IOException;
+
+ /**
+ * Get the replication status for the specified connected remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return true if replication is enabled, false otherwise.
+ */
+ public boolean getStatusOfConnectedPeer(String peerId);
+
+ /**
+ * Get a set of all connected remote slave clusters.
+ * @return set of peer ids
+ */
+ public Set<String> getConnectedPeers();
+
+ /**
+ * List the cluster keys of all remote slave clusters (whether they are enabled/disabled or
+ * connected/disconnected).
+ * @return A map of peer ids to peer cluster keys
+ */
+ public Map<String, String> getAllPeerClusterKeys();
+
+ /**
+ * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
+ * connected/disconnected).
+ * @return A list of peer ids
+ */
+ public List<String> getAllPeerIds();
+
+ /**
+ * Attempt to connect to a new remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return true if a new connection was made, false if no new connection was made.
+ */
+ public boolean connectToPeer(String peerId) throws IOException, KeeperException;
+
+ /**
+ * Disconnect from a remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ */
+ public void disconnectFromPeer(String peerId);
+
+ /**
+ * Returns all region servers from given connected remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return addresses of all region servers in the peer cluster. Returns an empty list if the peer
+ * cluster is unavailable or there are no region servers in the cluster.
+ */
+ public List<ServerName> getRegionServersOfConnectedPeer(String peerId);
+
+ /**
+ * Returns the UUID of the provided peer id.
+ * @param peerId the peer's ID that will be converted into a UUID
+ * @return a UUID or null if the peer cluster does not exist or is not connected.
+ */
+ public UUID getPeerUUID(String peerId);
+
+ /**
+ * Returns the configuration needed to talk to the remote slave cluster.
+ * @param peerId a short that identifies the cluster
+ * @return the configuration for the peer cluster, null if it was unable to get the configuration
+ */
+ public Configuration getPeerConf(String peerId) throws KeeperException;
+}
\ No newline at end of file
Added: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java?rev=1484030&view=auto
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java (added)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java Fri May 17 23:57:28 2013
@@ -0,0 +1,409 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.UUID;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.KeeperException.ConnectionLossException;
+import org.apache.zookeeper.KeeperException.SessionExpiredException;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+
+/**
+ * This class provides an implementation of the ReplicationPeers interface using Zookeeper. The
+ * peers znode contains a list of all peer replication clusters and the current replication state of
+ * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
+ * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
+ * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
+ * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
+ * For example:
+ *
+ * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
+ * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
+ *
+ * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
+ * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
+ * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
+ * ReplicationPeer.PeerStateTracker class. For example:
+ *
+ * /hbase/replication/peers/1/peer-state [Value: ENABLED]
+ */
+public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
+
+ // Map of peer clusters keyed by their id
+ private Map<String, ReplicationPeer> peerClusters;
+
+ private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
+
+ public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
+ Abortable abortable) {
+ super(zk, conf, abortable);
+ this.peerClusters = new HashMap<String, ReplicationPeer>();
+ }
+
+ @Override
+ public void init() throws IOException, KeeperException {
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ connectExistingPeers();
+ }
+
+ @Override
+ public void addPeer(String id, String clusterKey) throws IOException {
+ try {
+ if (peerExists(id)) {
+ throw new IllegalArgumentException("Cannot add existing peer");
+ }
+ ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
+ ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
+ toByteArray(clusterKey));
+ // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
+ // peer-state znode. This happens while adding a peer.
+ // The peer state data is set as "ENABLED" by default.
+ ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
+ ENABLED_ZNODE_BYTES);
+ // A peer is enabled by default
+ } catch (KeeperException e) {
+ throw new IOException("Unable to add peer", e);
+ }
+ }
+
+ @Override
+ public void removePeer(String id) throws IOException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("Cannot remove inexisting peer");
+ }
+ ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ } catch (KeeperException e) {
+ throw new IOException("Unable to remove a peer", e);
+ }
+ }
+
+ @Override
+ public void enablePeer(String id) throws IOException {
+ changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
+ LOG.info("peer " + id + " is enabled");
+ }
+
+ @Override
+ public void disablePeer(String id) throws IOException {
+ changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
+ LOG.info("peer " + id + " is disabled");
+ }
+
+ @Override
+ public boolean getStatusOfConnectedPeer(String id) {
+ if (!this.peerClusters.containsKey(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not connected");
+ }
+ return this.peerClusters.get(id).getPeerEnabled().get();
+ }
+
+ @Override
+ public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+ if (peerClusters == null) {
+ return false;
+ }
+ if (this.peerClusters.containsKey(peerId)) {
+ return false;
+ }
+ ReplicationPeer peer = getPeer(peerId);
+ if (peer == null) {
+ return false;
+ }
+ this.peerClusters.put(peerId, peer);
+ LOG.info("Added new peer cluster " + peer.getClusterKey());
+ return true;
+ }
+
+ @Override
+ public void disconnectFromPeer(String peerId) {
+ ReplicationPeer rp = this.peerClusters.get(peerId);
+ if (rp != null) {
+ rp.getZkw().close();
+ this.peerClusters.remove(peerId);
+ }
+ }
+
+ @Override
+ public Map<String, String> getAllPeerClusterKeys() {
+ Map<String, String> peers = new TreeMap<String, String>();
+ List<String> ids = null;
+ try {
+ ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ for (String id : ids) {
+ byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
+ String clusterKey = null;
+ try {
+ clusterKey = parsePeerFrom(bytes);
+ } catch (DeserializationException de) {
+ LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
+ continue;
+ }
+ peers.put(id, clusterKey);
+ }
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ }
+ return peers;
+ }
+
+ @Override
+ public List<ServerName> getRegionServersOfConnectedPeer(String peerId) {
+ if (this.peerClusters.size() == 0) {
+ return Collections.emptyList();
+ }
+ ReplicationPeer peer = this.peerClusters.get(peerId);
+ if (peer == null) {
+ return Collections.emptyList();
+ }
+ List<ServerName> addresses;
+ try {
+ addresses = fetchSlavesAddresses(peer.getZkw());
+ } catch (KeeperException ke) {
+ reconnectPeer(ke, peer);
+ addresses = Collections.emptyList();
+ }
+ peer.setRegionServers(addresses);
+ return peer.getRegionServers();
+ }
+
+ @Override
+ public UUID getPeerUUID(String peerId) {
+ ReplicationPeer peer = this.peerClusters.get(peerId);
+ if (peer == null) {
+ return null;
+ }
+ UUID peerUUID = null;
+ try {
+ peerUUID = ZKClusterId.getUUIDForCluster(peer.getZkw());
+ } catch (KeeperException ke) {
+ reconnectPeer(ke, peer);
+ }
+ return peerUUID;
+ }
+
+ @Override
+ public Set<String> getConnectedPeers() {
+ return this.peerClusters.keySet();
+ }
+
+ @Override
+ public Configuration getPeerConf(String peerId) throws KeeperException {
+ String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
+ byte[] data = ZKUtil.getData(this.zookeeper, znode);
+ if (data == null) {
+ LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
+ return null;
+ }
+ String otherClusterKey = "";
+ try {
+ otherClusterKey = parsePeerFrom(data);
+ } catch (DeserializationException e) {
+ LOG.warn("Failed to parse cluster key from peerId=" + peerId
+ + ", specifically the content from the following znode: " + znode);
+ return null;
+ }
+
+ Configuration otherConf = new Configuration(this.conf);
+ try {
+ ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
+ } catch (IOException e) {
+ LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
+ return null;
+ }
+ return otherConf;
+ }
+
+ /**
+ * List all registered peer clusters and set a watch on their znodes.
+ */
+ @Override
+ public List<String> getAllPeerIds() {
+ List<String> ids = null;
+ try {
+ ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
+ } catch (KeeperException e) {
+ this.abortable.abort("Cannot get the list of peers ", e);
+ }
+ return ids;
+ }
+
+ /**
+ * A private method used during initialization. This method attempts to connect to all registered
+ * peer clusters. This method does not set a watch on the peer cluster znodes.
+ * @throws IOException
+ * @throws KeeperException
+ */
+ private void connectExistingPeers() throws IOException, KeeperException {
+ List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
+ if (znodes != null) {
+ for (String z : znodes) {
+ connectToPeer(z);
+ }
+ }
+ }
+
+ /**
+ * A private method used to re-establish a zookeeper session with a peer cluster.
+ * @param ke
+ * @param peer
+ */
+ private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
+ if (ke instanceof ConnectionLossException || ke instanceof SessionExpiredException) {
+ LOG.warn("Lost the ZooKeeper connection for peer " + peer.getClusterKey(), ke);
+ try {
+ peer.reloadZkWatcher();
+ } catch (IOException io) {
+ LOG.warn("Creation of ZookeeperWatcher failed for peer " + peer.getClusterKey(), io);
+ }
+ }
+ }
+
+ /**
+ * Get the list of all the region servers from the specified peer
+ * @param zkw zk connection to use
+ * @return list of region server addresses or an empty list if the slave is unavailable
+ */
+ private static List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
+ throws KeeperException {
+ List<String> children = ZKUtil.listChildrenNoWatch(zkw, zkw.rsZNode);
+ if (children == null) {
+ return Collections.emptyList();
+ }
+ List<ServerName> addresses = new ArrayList<ServerName>(children.size());
+ for (String child : children) {
+ addresses.add(ServerName.parseServerName(child));
+ }
+ return addresses;
+ }
+
+ private String getPeerStateNode(String id) {
+ return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+ }
+
+ /**
+ * Update the state znode of a peer cluster.
+ * @param id
+ * @param state
+ * @throws IOException
+ */
+ private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
+ throws IOException {
+ try {
+ if (!peerExists(id)) {
+ throw new IllegalArgumentException("peer " + id + " is not registered");
+ }
+ String peerStateZNode = getPeerStateNode(id);
+ byte[] stateBytes =
+ (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
+ : DISABLED_ZNODE_BYTES;
+ if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
+ ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
+ } else {
+ ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
+ }
+ LOG.info("state of the peer " + id + " changed to " + state.name());
+ } catch (KeeperException e) {
+ throw new IOException("Unable to change state of the peer " + id, e);
+ }
+ }
+
+ /**
+ * Helper method to connect to a peer
+ * @param peerId peer's identifier
+ * @return object representing the peer
+ * @throws IOException
+ * @throws KeeperException
+ */
+ private ReplicationPeer getPeer(String peerId) throws IOException, KeeperException {
+ Configuration peerConf = getPeerConf(peerId);
+ if (peerConf == null) {
+ return null;
+ }
+ if (this.ourClusterKey.equals(ZKUtil.getZooKeeperClusterKey(peerConf))) {
+ LOG.debug("Not connecting to " + peerId + " because it's us");
+ return null;
+ }
+
+ ReplicationPeer peer =
+ new ReplicationPeer(peerConf, peerId, ZKUtil.getZooKeeperClusterKey(peerConf));
+ peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
+ return peer;
+ }
+
+ /**
+ * @param bytes Content of a peer znode.
+ * @return ClusterKey parsed from the passed bytes.
+ * @throws DeserializationException
+ */
+ private static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
+ if (ProtobufUtil.isPBMagicPrefix(bytes)) {
+ int pblen = ProtobufUtil.lengthOfPBMagic();
+ ZooKeeperProtos.ReplicationPeer.Builder builder =
+ ZooKeeperProtos.ReplicationPeer.newBuilder();
+ ZooKeeperProtos.ReplicationPeer peer;
+ try {
+ peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
+ } catch (InvalidProtocolBufferException e) {
+ throw new DeserializationException(e);
+ }
+ return peer.getClusterkey();
+ } else {
+ if (bytes.length > 0) {
+ return Bytes.toString(bytes);
+ }
+ return "";
+ }
+ }
+
+ /**
+ * @param clusterKey
+ * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix prepended suitable
+ * for use as content of a this.peersZNode; i.e. the content of PEER_ID znode under
+ * /hbase/replication/peers/PEER_ID
+ */
+ private static byte[] toByteArray(final String clusterKey) {
+ byte[] bytes =
+ ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
+ .toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java Fri May 17 23:57:28 2013
@@ -22,12 +22,14 @@ import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
/**
* This provides an interface for maintaining a region server's replication queues. These queues
* keep track of the HLogs that still need to be replicated to remote clusters.
*/
+@InterfaceAudience.Private
public interface ReplicationQueues {
/**
@@ -35,7 +37,7 @@ public interface ReplicationQueues {
* @param serverName The server name of the region server that owns the replication queues this
* interface manages.
*/
- public void init(String serverName);
+ public void init(String serverName) throws KeeperException;
/**
* Remove a replication queue.
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java Fri May 17 23:57:28 2013
@@ -39,8 +39,27 @@ import org.apache.hadoop.hbase.zookeeper
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
import org.apache.zookeeper.KeeperException;
-import com.google.protobuf.InvalidProtocolBufferException;
-
+/**
+ * This class provides an implementation of the ReplicationQueues interface using Zookeeper. The
+ * base znode that this class works at is the myQueuesZnode. The myQueuesZnode contains a list of
+ * all outstanding HLog files on this region server that need to be replicated. The myQueuesZnode is
+ * the regionserver name (a concatenation of the region serverâs hostname, client port and start
+ * code). For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234
+ *
+ * Within this znode, the region server maintains a set of HLog replication queues. These queues are
+ * represented by child znodes named using there give queue id. For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1
+ * /hbase/replication/rs/hostname.example.org,6020,1234/2
+ *
+ * Each queue has one child znode for every HLog that still needs to be replicated. The value of
+ * these HLog child znodes is the latest position that has been replicated. This position is updated
+ * every time a HLog entry is replicated. For example:
+ *
+ * /hbase/replication/rs/hostname.example.org,6020,1234/1/23522342.23422 [VALUE: 254]
+ */
public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements ReplicationQueues {
/** Znode containing all replication queues for this region server. */
@@ -50,14 +69,15 @@ public class ReplicationQueuesZKImpl ext
private static final Log LOG = LogFactory.getLog(ReplicationQueuesZKImpl.class);
- public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf, Abortable abortable)
- throws KeeperException {
+ public ReplicationQueuesZKImpl(final ZooKeeperWatcher zk, Configuration conf,
+ Abortable abortable) {
super(zk, conf, abortable);
}
@Override
- public void init(String serverName) {
+ public void init(String serverName) throws KeeperException {
this.myQueuesZnode = ZKUtil.joinZNode(this.queuesZNode, serverName);
+ ZKUtil.createWithParents(this.zookeeper, this.myQueuesZnode);
}
@Override
@@ -94,7 +114,7 @@ public class ReplicationQueuesZKImpl ext
String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
znode = ZKUtil.joinZNode(znode, filename);
// Why serialize String of Long and not Long as bytes?
- ZKUtil.setData(this.zookeeper, znode, toByteArray(position));
+ ZKUtil.setData(this.zookeeper, znode, ZKUtil.positionToByteArray(position));
} catch (KeeperException e) {
this.abortable.abort("Failed to write replication hlog position (filename=" + filename
+ ", position=" + position + ")", e);
@@ -107,7 +127,7 @@ public class ReplicationQueuesZKImpl ext
String znode = ZKUtil.joinZNode(clusterZnode, filename);
byte[] bytes = ZKUtil.getData(this.zookeeper, znode);
try {
- return parseHLogPositionFrom(bytes);
+ return ZKUtil.parseHLogPositionFrom(bytes);
} catch (DeserializationException de) {
LOG.warn("Failed to parse HLogPosition for queueId=" + queueId + " and hlog=" + filename
+ "znode content, continuing.");
@@ -351,7 +371,7 @@ public class ReplicationQueuesZKImpl ext
byte[] positionBytes = ZKUtil.getData(this.zookeeper, z);
long position = 0;
try {
- position = parseHLogPositionFrom(positionBytes);
+ position = ZKUtil.parseHLogPositionFrom(positionBytes);
} catch (DeserializationException e) {
LOG.warn("Failed parse of hlog position from the following znode: " + z
+ ", Exception: " + e);
@@ -380,44 +400,4 @@ public class ReplicationQueuesZKImpl ext
ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build().toByteArray();
return ProtobufUtil.prependPBMagic(bytes);
}
-
- /**
- * @param position
- * @return Serialized protobuf of <code>position</code> with pb magic prefix prepended suitable
- * for use as content of an hlog position in a replication queue.
- */
- static byte[] toByteArray(final long position) {
- byte[] bytes =
- ZooKeeperProtos.ReplicationHLogPosition.newBuilder().setPosition(position).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param bytes - Content of a HLog position znode.
- * @return long - The current HLog position.
- * @throws DeserializationException
- */
- private long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
- if(bytes == null) {
- throw new DeserializationException("Unable to parse null HLog position.");
- }
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
- ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
- ZooKeeperProtos.ReplicationHLogPosition position;
- try {
- position = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- return position.getPosition();
- } else {
- if (bytes.length > 0) {
- return Bytes.toLong(bytes);
- }
- return 0;
- }
- }
}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateImpl.java Fri May 17 23:57:28 2013
@@ -54,8 +54,6 @@ public class ReplicationStateImpl extend
// Set a tracker on replicationStateNode
this.stateTracker =
new ReplicationStateTracker(this.zookeeper, this.stateZNode, this.abortable);
- stateTracker.start();
- readReplicationStateZnode();
}
public ReplicationStateImpl(final ZooKeeperWatcher zk, final Configuration conf,
@@ -64,6 +62,13 @@ public class ReplicationStateImpl extend
}
@Override
+ public void init() throws KeeperException {
+ ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
+ stateTracker.start();
+ readReplicationStateZnode();
+ }
+
+ @Override
public boolean getState() throws KeeperException {
return getReplication();
}
@@ -115,8 +120,7 @@ public class ReplicationStateImpl extend
*/
private void setReplicating(boolean newState) throws KeeperException {
ZKUtil.createWithParents(this.zookeeper, this.stateZNode);
- byte[] stateBytes = (newState == true) ? ReplicationZookeeper.ENABLED_ZNODE_BYTES
- : ReplicationZookeeper.DISABLED_ZNODE_BYTES;
+ byte[] stateBytes = (newState == true) ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
ZKUtil.setData(this.zookeeper, this.stateZNode, stateBytes);
}
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateInterface.java Fri May 17 23:57:28 2013
@@ -18,6 +18,7 @@
*/
package org.apache.hadoop.hbase.replication;
+import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import java.io.Closeable;
@@ -27,19 +28,23 @@ import java.io.Closeable;
* cluster. This state is used to indicate whether replication is enabled or
* disabled on a cluster.
*/
+@InterfaceAudience.Private
public interface ReplicationStateInterface extends Closeable {
-
+
+ /**
+ * Initialize the replication state interface.
+ */
+ public void init() throws KeeperException;
+
/**
* Get the current state of replication (i.e. ENABLED or DISABLED).
- *
* @return true if replication is enabled, false otherwise
* @throws KeeperException
*/
public boolean getState() throws KeeperException;
-
+
/**
* Set the state of replication.
- *
* @param newState
* @throws KeeperException
*/
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java Fri May 17 23:57:28 2013
@@ -22,6 +22,8 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
+import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
@@ -51,7 +53,13 @@ public abstract class ReplicationStateZK
protected final Configuration conf;
protected final Abortable abortable;
- public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
+ // Public for testing
+ public static final byte[] ENABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
+ protected static final byte[] DISABLED_ZNODE_BYTES =
+ toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
+
+ public ReplicationStateZKBase(ZooKeeperWatcher zookeeper, Configuration conf,
Abortable abortable) {
this.zookeeper = zookeeper;
this.conf = conf;
@@ -79,8 +87,20 @@ public abstract class ReplicationStateZK
return result;
}
+ /**
+ * @param state
+ * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
+ * use as content of either the cluster state znode -- whether or not we should be
+ * replicating kept in /hbase/replication/state -- or as content of a peer-state znode
+ * under a peer cluster id as in /hbase/replication/peers/PEER_ID/peer-state.
+ */
+ protected static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
+ byte[] bytes =
+ ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build().toByteArray();
+ return ProtobufUtil.prependPBMagic(bytes);
+ }
+
public boolean peerExists(String id) throws KeeperException {
- return ZKUtil.checkExists(this.zookeeper,
- ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
+ return ZKUtil.checkExists(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id)) >= 0;
}
}
\ No newline at end of file
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationZookeeper.java Fri May 17 23:57:28 2013
@@ -18,7 +18,6 @@
*/
package org.apache.hadoop.hbase.replication;
-import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@@ -26,29 +25,18 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperListener;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.ConnectionLossException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-import org.apache.zookeeper.KeeperException.SessionExpiredException;
import java.io.Closeable;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.SortedMap;
import java.util.SortedSet;
-import java.util.TreeMap;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -83,63 +71,37 @@ import java.util.concurrent.atomic.Atomi
*/
@InterfaceAudience.Private
public class ReplicationZookeeper extends ReplicationStateZKBase implements Closeable {
- private static final Log LOG =
- LogFactory.getLog(ReplicationZookeeper.class);
+ private static final Log LOG = LogFactory.getLog(ReplicationZookeeper.class);
// Our handle on zookeeper
private final ZooKeeperWatcher zookeeper;
- // Map of peer clusters keyed by their id
- private Map<String, ReplicationPeer> peerClusters;
- // Path to the root replication znode
- private String replicationZNode;
- // Path to the peer clusters znode
private String peersZNode;
- // Path to the znode that contains all RS that replicates
- private String rsZNode;
- // Path to this region server's name under rsZNode
- private String rsServerNameZnode;
- // Name node if the replicationState znode
- private String replicationStateNodeName;
- // Name of zk node which stores peer state. The peer-state znode is under a
- // peers' id node; e.g. /hbase/replication/peers/PEER_ID/peer-state
- private String peerStateNodeName;
private final Configuration conf;
- // The key to our own cluster
- private String ourClusterKey;
// Abortable
private Abortable abortable;
private final ReplicationStateInterface replicationState;
+ private final ReplicationPeers replicationPeers;
private final ReplicationQueues replicationQueues;
/**
- * ZNode content if enabled state.
- */
- // Public so it can be seen by test code.
- public static final byte[] ENABLED_ZNODE_BYTES =
- toByteArray(ZooKeeperProtos.ReplicationState.State.ENABLED);
-
- /**
- * ZNode content if disabled state.
- */
- static final byte[] DISABLED_ZNODE_BYTES =
- toByteArray(ZooKeeperProtos.ReplicationState.State.DISABLED);
-
- /**
* Constructor used by clients of replication (like master and HBase clients)
* @param conf conf to use
* @param zk zk connection to use
* @throws IOException
*/
public ReplicationZookeeper(final Abortable abortable, final Configuration conf,
- final ZooKeeperWatcher zk) throws KeeperException {
+ final ZooKeeperWatcher zk) throws KeeperException, IOException {
super(zk, conf, abortable);
this.conf = conf;
this.zookeeper = zk;
setZNodes(abortable);
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, abortable);
+ this.replicationState.init();
// TODO This interface is no longer used by anyone using this constructor. When this class goes
// away, we will no longer have this null initialization business
this.replicationQueues = null;
+ this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, abortable);
+ this.replicationPeers.init();
}
/**
@@ -157,39 +119,19 @@ public class ReplicationZookeeper extend
this.zookeeper = server.getZooKeeper();
this.conf = server.getConfiguration();
setZNodes(server);
-
this.replicationState = new ReplicationStateImpl(this.zookeeper, conf, server, replicating);
- this.peerClusters = new HashMap<String, ReplicationPeer>();
- ZKUtil.createWithParents(this.zookeeper,
- ZKUtil.joinZNode(this.replicationZNode, this.replicationStateNodeName));
- this.rsServerNameZnode = ZKUtil.joinZNode(rsZNode, server.getServerName().toString());
- ZKUtil.createWithParents(this.zookeeper, this.rsServerNameZnode);
+ this.replicationState.init();
this.replicationQueues = new ReplicationQueuesZKImpl(this.zookeeper, this.conf, server);
this.replicationQueues.init(server.getServerName().toString());
- connectExistingPeers();
+ this.replicationPeers = new ReplicationPeersZKImpl(this.zookeeper, this.conf, server);
+ this.replicationPeers.init();
}
private void setZNodes(Abortable abortable) throws KeeperException {
String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
- this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
- this.replicationStateNodeName = conf.get("zookeeper.znode.replication.state", "state");
- String rsZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
- this.ourClusterKey = ZKUtil.getZooKeeperClusterKey(this.conf);
- this.replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
+ String replicationZNode = ZKUtil.joinZNode(this.zookeeper.baseZNode, replicationZNodeName);
this.peersZNode = ZKUtil.joinZNode(replicationZNode, peersZNodeName);
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- this.rsZNode = ZKUtil.joinZNode(replicationZNode, rsZNodeName);
- ZKUtil.createWithParents(this.zookeeper, this.rsZNode);
- }
-
- private void connectExistingPeers() throws IOException, KeeperException {
- List<String> znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- if (znodes != null) {
- for (String z : znodes) {
- connectToPeer(z);
- }
- }
}
/**
@@ -197,39 +139,15 @@ public class ReplicationZookeeper extend
* @return list of all peers' identifiers
*/
public List<String> listPeersIdsAndWatch() {
- List<String> ids = null;
- try {
- ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return ids;
+ return this.replicationPeers.getAllPeerIds();
}
/**
* Map of this cluster's peers for display.
* @return A map of peer ids to peer cluster keys
*/
- public Map<String,String> listPeers() {
- Map<String,String> peers = new TreeMap<String,String>();
- List<String> ids = null;
- try {
- ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- for (String id : ids) {
- byte[] bytes = ZKUtil.getData(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
- String clusterKey = null;
- try {
- clusterKey = parsePeerFrom(bytes);
- } catch (DeserializationException de) {
- LOG.warn("Failed parse of clusterid=" + id + " znode content, continuing.");
- continue;
- }
- peers.put(id, clusterKey);
- }
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return peers;
+ public Map<String, String> listPeers() {
+ return this.replicationPeers.getAllPeerClusterKeys();
}
/**
@@ -239,126 +157,17 @@ public class ReplicationZookeeper extend
* @return addresses of all region servers
*/
public List<ServerName> getSlavesAddresses(String peerClusterId) {
- if (this.peerClusters.size() == 0) {
- return Collections.emptyList();
- }
- ReplicationPeer peer = this.peerClusters.get(peerClusterId);
- if (peer == null) {
- return Collections.emptyList();
- }
-
- List<ServerName> addresses;
- try {
- addresses = fetchSlavesAddresses(peer.getZkw());
- } catch (KeeperException ke) {
- reconnectPeer(ke, peer);
- addresses = Collections.emptyList();
- }
- peer.setRegionServers(addresses);
- return peer.getRegionServers();
- }
-
- /**
- * Get the list of all the region servers from the specified peer
- * @param zkw zk connection to use
- * @return list of region server addresses or an empty list if the slave
- * is unavailable
- */
- private List<ServerName> fetchSlavesAddresses(ZooKeeperWatcher zkw)
- throws KeeperException {
- return listChildrenAndGetAsServerNames(zkw, zkw.rsZNode);
- }
-
- /**
- * Lists the children of the specified znode, retrieving the data of each
- * child as a server address.
- *
- * Used to list the currently online regionservers and their addresses.
- *
- * Sets no watches at all, this method is best effort.
- *
- * Returns an empty list if the node has no children. Returns null if the
- * parent node itself does not exist.
- *
- * @param zkw zookeeper reference
- * @param znode node to get children of as addresses
- * @return list of data of children of specified znode, empty if no children,
- * null if parent does not exist
- * @throws KeeperException if unexpected zookeeper exception
- */
- public static List<ServerName> listChildrenAndGetAsServerNames(
- ZooKeeperWatcher zkw, String znode)
- throws KeeperException {
- List<String> children = ZKUtil.listChildrenNoWatch(zkw, znode);
- if(children == null) {
- return Collections.emptyList();
- }
- List<ServerName> addresses = new ArrayList<ServerName>(children.size());
- for (String child : children) {
- addresses.add(ServerName.parseServerName(child));
- }
- return addresses;
+ return this.replicationPeers.getRegionServersOfConnectedPeer(peerClusterId);
}
/**
* This method connects this cluster to another one and registers it
* in this region server's replication znode
* @param peerId id of the peer cluster
- * @throws KeeperException
- */
- public boolean connectToPeer(String peerId)
- throws IOException, KeeperException {
- if (peerClusters == null) {
- return false;
- }
- if (this.peerClusters.containsKey(peerId)) {
- return false;
- }
- ReplicationPeer peer = getPeer(peerId);
- if (peer == null) {
- return false;
- }
- this.peerClusters.put(peerId, peer);
- ZKUtil.createWithParents(this.zookeeper, ZKUtil.joinZNode(
- this.rsServerNameZnode, peerId));
- LOG.info("Added new peer cluster " + peer.getClusterKey());
- return true;
- }
-
- /**
- * Helper method to connect to a peer
- * @param peerId peer's identifier
- * @return object representing the peer
- * @throws IOException
* @throws KeeperException
*/
- public ReplicationPeer getPeer(String peerId) throws IOException, KeeperException{
- String znode = ZKUtil.joinZNode(this.peersZNode, peerId);
- byte [] data = ZKUtil.getData(this.zookeeper, znode);
- String otherClusterKey = "";
- try {
- otherClusterKey = parsePeerFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed parse of cluster key from peerId=" + peerId
- + ", specifically the content from the following znode: " + znode);
- }
- if (this.ourClusterKey.equals(otherClusterKey)) {
- LOG.debug("Not connecting to " + peerId + " because it's us");
- return null;
- }
- // Construct the connection to the new peer
- Configuration otherConf = new Configuration(this.conf);
- try {
- ZKUtil.applyClusterKeyToConf(otherConf, otherClusterKey);
- } catch (IOException e) {
- LOG.error("Can't get peer because:", e);
- return null;
- }
-
- ReplicationPeer peer = new ReplicationPeer(otherConf, peerId,
- otherClusterKey);
- peer.startStateTracker(this.zookeeper, this.getPeerStateNode(peerId));
- return peer;
+ public boolean connectToPeer(String peerId) throws IOException, KeeperException {
+ return this.replicationPeers.connectToPeer(peerId);
}
/**
@@ -368,15 +177,7 @@ public class ReplicationZookeeper extend
* @throws IllegalArgumentException Thrown when the peer doesn't exist
*/
public void removePeer(String id) throws IOException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot remove inexisting peer");
- }
- ZKUtil.deleteNodeRecursively(this.zookeeper,
- ZKUtil.joinZNode(this.peersZNode, id));
- } catch (KeeperException e) {
- throw new IOException("Unable to remove a peer", e);
- }
+ this.replicationPeers.removePeer(id);
}
/**
@@ -388,154 +189,7 @@ public class ReplicationZookeeper extend
* multi-slave isn't supported yet.
*/
public void addPeer(String id, String clusterKey) throws IOException {
- try {
- if (peerExists(id)) {
- throw new IllegalArgumentException("Cannot add existing peer");
- }
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- ZKUtil.createAndWatch(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id),
- toByteArray(clusterKey));
- // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
- // peer-state znode. This happens while adding a peer.
- // The peer state data is set as "ENABLED" by default.
- ZKUtil.createNodeIfNotExistsAndWatch(this.zookeeper, getPeerStateNode(id),
- ENABLED_ZNODE_BYTES);
- // A peer is enabled by default
- } catch (KeeperException e) {
- throw new IOException("Unable to add peer", e);
- }
- }
-
- /**
- * @param clusterKey
- * @return Serialized protobuf of <code>clusterKey</code> with pb magic prefix
- * prepended suitable for use as content of a this.peersZNode; i.e.
- * the content of PEER_ID znode under /hbase/replication/peers/PEER_ID
- */
- static byte[] toByteArray(final String clusterKey) {
- byte[] bytes = ZooKeeperProtos.ReplicationPeer.newBuilder().setClusterkey(clusterKey).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param state
- * @return Serialized protobuf of <code>state</code> with pb magic prefix
- * prepended suitable for use as content of either the cluster state
- * znode -- whether or not we should be replicating kept in
- * /hbase/replication/state -- or as content of a peer-state znode
- * under a peer cluster id as in
- * /hbase/replication/peers/PEER_ID/peer-state.
- */
- static byte[] toByteArray(final ZooKeeperProtos.ReplicationState.State state) {
- byte[] bytes = ZooKeeperProtos.ReplicationState.newBuilder().setState(state).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param position
- * @return Serialized protobuf of <code>position</code> with pb magic prefix
- * prepended suitable for use as content of an hlog position in a
- * replication queue.
- */
- public static byte[] positionToByteArray(
- final long position) {
- return ZKUtil.positionToByteArray(position);
- }
-
- /**
- * @param lockOwner
- * @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix
- * prepended suitable for use as content of an replication lock during
- * region server fail over.
- */
- static byte[] lockToByteArray(
- final String lockOwner) {
- byte[] bytes = ZooKeeperProtos.ReplicationLock.newBuilder().setLockOwner(lockOwner).build()
- .toByteArray();
- return ProtobufUtil.prependPBMagic(bytes);
- }
-
- /**
- * @param bytes Content of a peer znode.
- * @return ClusterKey parsed from the passed bytes.
- * @throws DeserializationException
- */
- static String parsePeerFrom(final byte[] bytes) throws DeserializationException {
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationPeer.Builder builder = ZooKeeperProtos.ReplicationPeer
- .newBuilder();
- ZooKeeperProtos.ReplicationPeer peer;
- try {
- peer = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- return peer.getClusterkey();
- } else {
- if (bytes.length > 0) {
- return Bytes.toString(bytes);
- }
- return "";
- }
- }
-
- /**
- * @param bytes Content of a state znode.
- * @return State parsed from the passed bytes.
- * @throws DeserializationException
- */
- static ZooKeeperProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
- throws DeserializationException {
- ProtobufUtil.expectPBMagicPrefix(bytes);
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationState.Builder builder = ZooKeeperProtos.ReplicationState
- .newBuilder();
- ZooKeeperProtos.ReplicationState state;
- try {
- state = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- return state.getState();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- }
-
- /**
- * @param bytes - Content of a HLog position znode.
- * @return long - The current HLog position.
- * @throws DeserializationException
- */
- public static long parseHLogPositionFrom(
- final byte[] bytes) throws DeserializationException {
- return ZKUtil.parseHLogPositionFrom(bytes);
- }
-
- /**
- * @param bytes - Content of a lock znode.
- * @return String - The owner of the lock.
- * @throws DeserializationException
- */
- static String parseLockOwnerFrom(
- final byte[] bytes) throws DeserializationException {
- if (ProtobufUtil.isPBMagicPrefix(bytes)) {
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationLock.Builder builder = ZooKeeperProtos.ReplicationLock
- .newBuilder();
- ZooKeeperProtos.ReplicationLock lock;
- try {
- lock = builder.mergeFrom(bytes, pblen, bytes.length - pblen).build();
- } catch (InvalidProtocolBufferException e) {
- throw new DeserializationException(e);
- }
- return lock.getLockOwner();
- } else {
- if (bytes.length > 0) {
- return Bytes.toString(bytes);
- }
- return "";
- }
+ this.replicationPeers.addPeer(id, clusterKey);
}
/**
@@ -546,8 +200,7 @@ public class ReplicationZookeeper extend
* Thrown when the peer doesn't exist
*/
public void enablePeer(String id) throws IOException {
- changePeerState(id, ZooKeeperProtos.ReplicationState.State.ENABLED);
- LOG.info("peer " + id + " is enabled");
+ this.replicationPeers.enablePeer(id);
}
/**
@@ -558,28 +211,7 @@ public class ReplicationZookeeper extend
* Thrown when the peer doesn't exist
*/
public void disablePeer(String id) throws IOException {
- changePeerState(id, ZooKeeperProtos.ReplicationState.State.DISABLED);
- LOG.info("peer " + id + " is disabled");
- }
-
- private void changePeerState(String id, ZooKeeperProtos.ReplicationState.State state)
- throws IOException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("peer " + id + " is not registered");
- }
- String peerStateZNode = getPeerStateNode(id);
- byte[] stateBytes = (state == ZooKeeperProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
- : DISABLED_ZNODE_BYTES;
- if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
- ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
- } else {
- ZKUtil.createAndWatch(zookeeper, peerStateZNode, stateBytes);
- }
- LOG.info("state of the peer " + id + " changed to " + state.name());
- } catch (KeeperException e) {
- throw new IOException("Unable to change state of the peer " + id, e);
- }
+ this.replicationPeers.disablePeer(id);
}
/**
@@ -592,14 +224,7 @@ public class ReplicationZookeeper extend
* Thrown when the peer doesn't exist
*/
public boolean getPeerEnabled(String id) {
- if (!this.peerClusters.containsKey(id)) {
- throw new IllegalArgumentException("peer " + id + " is not registered");
- }
- return this.peerClusters.get(id).getPeerEnabled().get();
- }
-
- private String getPeerStateNode(String id) {
- return ZKUtil.joinZNode(this.peersZNode, ZKUtil.joinZNode(id, this.peerStateNodeName));
+ return this.replicationPeers.getStatusOfConnectedPeer(id);
}
/**
@@ -683,8 +308,7 @@ public class ReplicationZookeeper extend
public void deleteSource(String peerZnode, boolean closeConnection) {
this.replicationQueues.removeQueue(peerZnode);
if (closeConnection) {
- this.peerClusters.get(peerZnode).getZkw().close();
- this.peerClusters.remove(peerZnode);
+ this.replicationPeers.disconnectFromPeer(peerZnode);
}
}
@@ -714,40 +338,7 @@ public class ReplicationZookeeper extend
* @return a UUID or null if there's a ZK connection issue
*/
public UUID getPeerUUID(String peerId) {
- ReplicationPeer peer = getPeerClusters().get(peerId);
- UUID peerUUID = null;
- try {
- peerUUID = getUUIDForCluster(peer.getZkw());
- } catch (KeeperException ke) {
- reconnectPeer(ke, peer);
- }
- return peerUUID;
- }
-
- /**
- * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
- * @param zkw watcher connected to an ensemble
- * @return the UUID read from zookeeper
- * @throws KeeperException
- */
- public UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
- return UUID.fromString(ZKClusterId.readClusterIdZNode(zkw));
- }
-
- private void reconnectPeer(KeeperException ke, ReplicationPeer peer) {
- if (ke instanceof ConnectionLossException
- || ke instanceof SessionExpiredException) {
- LOG.warn(
- "Lost the ZooKeeper connection for peer " + peer.getClusterKey(),
- ke);
- try {
- peer.reloadZkWatcher();
- } catch(IOException io) {
- LOG.warn(
- "Creation of ZookeeperWatcher failed for peer "
- + peer.getClusterKey(), io);
- }
- }
+ return this.replicationPeers.getPeerUUID(peerId);
}
public void registerRegionServerListener(ZooKeeperListener listener) {
@@ -758,8 +349,8 @@ public class ReplicationZookeeper extend
* Get a map of all peer clusters
* @return map of peer cluster keyed by id
*/
- public Map<String, ReplicationPeer> getPeerClusters() {
- return this.peerClusters;
+ public Set<String> getPeerClusters() {
+ return this.replicationPeers.getConnectedPeers();
}
/**
@@ -802,36 +393,4 @@ public class ReplicationZookeeper extend
public void close() throws IOException {
if (replicationState != null) replicationState.close();
}
-
- /**
- * Utility method to ensure an ENABLED znode is in place; if not present, we
- * create it.
- * @param zookeeper
- * @param path Path to znode to check
- * @return True if we created the znode.
- * @throws NodeExistsException
- * @throws KeeperException
- */
- static boolean ensurePeerEnabled(final ZooKeeperWatcher zookeeper, final String path)
- throws NodeExistsException, KeeperException {
- if (ZKUtil.checkExists(zookeeper, path) == -1) {
- // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
- // peer-state znode. This happens while adding a peer.
- // The peer state data is set as "ENABLED" by default.
- ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path, ENABLED_ZNODE_BYTES);
- return true;
- }
- return false;
- }
-
- /**
- * @param bytes
- * @return True if the passed in <code>bytes</code> are those of a pb
- * serialized ENABLED state.
- * @throws DeserializationException
- */
- static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
- ZooKeeperProtos.ReplicationState.State state = parseStateFrom(bytes);
- return ZooKeeperProtos.ReplicationState.State.ENABLED == state;
- }
}
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKClusterId.java Fri May 17 23:57:28 2013
@@ -19,6 +19,8 @@
package org.apache.hadoop.hbase.zookeeper;
+import java.util.UUID;
+
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ClusterId;
@@ -77,4 +79,14 @@ public class ZKClusterId {
throws KeeperException {
ZKUtil.createSetData(watcher, watcher.clusterIdZNode, id.toByteArray());
}
+
+ /**
+ * Get the UUID for the provided ZK watcher. Doesn't handle any ZK exceptions
+ * @param zkw watcher connected to an ensemble
+ * @return the UUID read from zookeeper
+ * @throws KeeperException
+ */
+ public static UUID getUUIDForCluster(ZooKeeperWatcher zkw) throws KeeperException {
+ return UUID.fromString(readClusterIdZNode(zkw));
+ }
}
Modified: hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java (original)
+++ hbase/branches/0.95/hbase-client/src/main/java/org/apache/hadoop/hbase/zookeeper/ZKUtil.java Fri May 17 23:57:28 2013
@@ -1836,9 +1836,12 @@ public class ZKUtil {
* @throws DeserializationException
*/
public static long parseHLogPositionFrom(final byte[] bytes) throws DeserializationException {
+ if (bytes == null) {
+ throw new DeserializationException("Unable to parse null HLog position.");
+ }
if (ProtobufUtil.isPBMagicPrefix(bytes)) {
int pblen = ProtobufUtil.lengthOfPBMagic();
- ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
+ ZooKeeperProtos.ReplicationHLogPosition.Builder builder =
ZooKeeperProtos.ReplicationHLogPosition.newBuilder();
ZooKeeperProtos.ReplicationHLogPosition position;
try {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/Import.java Fri May 17 23:57:28 2013
@@ -45,8 +45,10 @@ import org.apache.hadoop.hbase.client.Re
import org.apache.hadoop.hbase.exceptions.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -176,17 +178,10 @@ public class Import {
cfRenameMap = createCfRenameMap(conf);
filter = instantiateFilter(conf);
// TODO: This is kind of ugly doing setup of ZKW just to read the clusterid.
- ReplicationZookeeper zkHelper = null;
ZooKeeperWatcher zkw = null;
try {
- HConnection connection = HConnectionManager.getConnection(conf);
zkw = new ZooKeeperWatcher(conf, context.getTaskAttemptID().toString(), null);
- zkHelper = new ReplicationZookeeper(connection, conf, zkw);
- try {
- this.clusterId = zkHelper.getUUIDForCluster(zkw);
- } finally {
- if (zkHelper != null) zkHelper.close();
- }
+ clusterId = ZKClusterId.getUUIDForCluster(zkw);
} catch (ZooKeeperConnectionException e) {
LOG.error("Problem connecting to ZooKeper during task setup", e);
} catch (KeeperException e) {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/mapreduce/replication/VerifyReplication.java Fri May 17 23:57:28 2013
@@ -37,6 +37,8 @@ import org.apache.hadoop.hbase.mapreduce
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.replication.ReplicationPeer;
+import org.apache.hadoop.hbase.replication.ReplicationPeers;
+import org.apache.hadoop.hbase.replication.ReplicationPeersZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@@ -117,11 +119,13 @@ public class VerifyReplication {
@Override public void abort(String why, Throwable e) {}
@Override public boolean isAborted() {return false;}
});
- zk = new ReplicationZookeeper(conn, conf, localZKW);
- // Just verifying it we can connect
- peer = zk.getPeer(peerId);
- HTable replicatedTable = new HTable(peer.getConfiguration(),
- conf.get(NAME+".tableName"));
+ ReplicationPeers rp = new ReplicationPeersZKImpl(localZKW, conf, localZKW);
+ rp.init();
+ Configuration peerConf = rp.getPeerConf(peerId);
+ if (peerConf == null) {
+ throw new IOException("Couldn't get peer conf!");
+ }
+ HTable replicatedTable = new HTable(peerConf, conf.get(NAME + ".tableName"));
scan.setStartRow(value.getRow());
replicatedScanner = replicatedTable.getScanner(scan);
} catch (KeeperException e) {
@@ -175,42 +179,6 @@ public class VerifyReplication {
if (!conf.getBoolean(HConstants.REPLICATION_ENABLE_KEY, false)) {
throw new IOException("Replication needs to be enabled to verify it.");
}
- HConnectionManager.execute(new HConnectable<Void>(conf) {
- @Override
- public Void connect(HConnection conn) throws IOException {
- ZooKeeperWatcher localZKW = null;
- ReplicationZookeeper zk = null;
- ReplicationPeer peer = null;
- try {
- localZKW = new ZooKeeperWatcher(
- conf, "VerifyReplication", new Abortable() {
- @Override public void abort(String why, Throwable e) {}
- @Override public boolean isAborted() {return false;}
- });
- zk = new ReplicationZookeeper(conn, conf, localZKW);
- // Just verifying it we can connect
- peer = zk.getPeer(peerId);
- if (peer == null) {
- throw new IOException("Couldn't get access to the slave cluster," +
- "please see the log");
- }
- } catch (KeeperException ex) {
- throw new IOException("Couldn't get access to the slave cluster" +
- " because: ", ex);
- } finally {
- if (peer != null){
- peer.close();
- }
- if (zk != null){
- zk.close();
- }
- if (localZKW != null){
- localZKW.close();
- }
- }
- return null;
- }
- });
conf.set(NAME+".peerId", peerId);
conf.set(NAME+".tableName", tableName);
conf.setLong(NAME+".startTime", startTime);
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationLogCleaner.java Fri May 17 23:57:28 2013
@@ -136,6 +136,7 @@ public class ReplicationLogCleaner exten
this.zkw = new ZooKeeperWatcher(conf, "replicationLogCleaner", null);
this.replicationQueues = new ReplicationQueuesClientZKImpl(zkw, conf, this);
this.replicationState = new ReplicationStateImpl(zkw, conf, this);
+ this.replicationState.init();
} catch (KeeperException e) {
LOG.error("Error while configuring " + this.getClass().getName(), e);
} catch (IOException e) {
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java Fri May 17 23:57:28 2013
@@ -61,6 +61,7 @@ import org.apache.hadoop.hbase.replicati
import org.apache.hadoop.hbase.replication.ReplicationZookeeper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.zookeeper.KeeperException;
@@ -189,7 +190,7 @@ public class ReplicationSource extends T
this.metrics = new MetricsSource(peerClusterZnode);
this.repLogReader = new ReplicationHLogReaderManager(this.fs, this.conf);
try {
- this.clusterId = zkHelper.getUUIDForCluster(zkHelper.getZookeeperWatcher());
+ this.clusterId = ZKClusterId.getUUIDForCluster(zkHelper.getZookeeperWatcher());
} catch (KeeperException ke) {
throw new IOException("Could not read cluster id", ke);
}
Modified: hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java (original)
+++ hbase/branches/0.95/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java Fri May 17 23:57:28 2013
@@ -177,7 +177,7 @@ public class ReplicationSourceManager {
* old region server hlog queues
*/
public void init() throws IOException {
- for (String id : this.zkHelper.getPeerClusters().keySet()) {
+ for (String id : this.zkHelper.getPeerClusters()) {
addSource(id);
}
List<String> currentReplicators = this.replicationQueues.getListOfReplicators();
@@ -601,7 +601,7 @@ public class ReplicationSourceManager {
try {
ReplicationSourceInterface src = getReplicationSource(conf,
fs, ReplicationSourceManager.this, stopper, replicating, peerId);
- if (!zkHelper.getPeerClusters().containsKey(src.getPeerClusterId())) {
+ if (!zkHelper.getPeerClusters().contains((src.getPeerClusterId()))) {
src.terminate("Recovered queue doesn't belong to any current peer");
break;
}
Modified: hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
URL: http://svn.apache.org/viewvc/hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java?rev=1484030&r1=1484029&r2=1484030&view=diff
==============================================================================
--- hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java (original)
+++ hbase/branches/0.95/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java Fri May 17 23:57:28 2013
@@ -20,12 +20,17 @@ package org.apache.hadoop.hbase.replicat
import static org.junit.Assert.*;
+import java.io.IOException;
import java.util.List;
import java.util.SortedMap;
import java.util.SortedSet;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.zookeeper.KeeperException;
+import org.junit.Before;
import org.junit.Test;
/**
@@ -41,6 +46,26 @@ public abstract class TestReplicationSta
protected String server1 = new ServerName("hostname1.example.org", 1234, -1L).toString();
protected String server2 = new ServerName("hostname2.example.org", 1234, -1L).toString();
protected String server3 = new ServerName("hostname3.example.org", 1234, -1L).toString();
+ protected ReplicationPeers rp;
+ protected static final String ID_ONE = "1";
+ protected static final String ID_TWO = "2";
+ protected static String KEY_ONE;
+ protected static String KEY_TWO;
+
+ // For testing when we try to replicate to ourself
+ protected String OUR_ID = "3";
+ protected String OUR_KEY;
+
+ protected static int zkTimeoutCount;
+ protected static final int ZK_MAX_COUNT = 300;
+ protected static final int ZK_SLEEP_INTERVAL = 100; // millis
+
+ private static final Log LOG = LogFactory.getLog(TestReplicationStateBasic.class);
+
+ @Before
+ public void setUp() {
+ zkTimeoutCount = 0;
+ }
@Test
public void testReplicationQueuesClient() throws KeeperException {
@@ -83,13 +108,15 @@ public abstract class TestReplicationSta
}
@Test
- public void testReplicationQueues() throws KeeperException {
+ public void testReplicationQueues() throws KeeperException, IOException {
rq1.init(server1);
rq2.init(server2);
rq3.init(server3);
+ //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
+ rp.init();
- // Zero queues or replicators exist
- assertEquals(0, rq1.getListOfReplicators().size());
+ // 3 replicators should exist
+ assertEquals(3, rq1.getListOfReplicators().size());
rq1.removeQueue("bogus");
rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues();
@@ -132,11 +159,102 @@ public abstract class TestReplicationSta
assertEquals(0, rq2.getListOfReplicators().size());
}
+ @Test
+ public void testReplicationPeers() throws Exception {
+ rp.init();
+
+ // Test methods with non-existent peer ids
+ try {
+ rp.removePeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ try {
+ rp.enablePeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ try {
+ rp.disablePeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ try {
+ rp.getStatusOfConnectedPeer("bogus");
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (IllegalArgumentException e) {
+ }
+ assertFalse(rp.connectToPeer("bogus"));
+ rp.disconnectFromPeer("bogus");
+ assertEquals(0, rp.getRegionServersOfConnectedPeer("bogus").size());
+ assertNull(rp.getPeerUUID("bogus"));
+ assertNull(rp.getPeerConf("bogus"));
+ assertNumberOfPeers(0, 0);
+
+ // Add some peers
+ rp.addPeer(ID_ONE, KEY_ONE);
+ assertNumberOfPeers(0, 1);
+ rp.addPeer(ID_TWO, KEY_TWO);
+ assertNumberOfPeers(0, 2);
+
+ // Test methods with a peer that is added but not connected
+ try {
+ rp.getStatusOfConnectedPeer(ID_ONE);
+ fail("There are no connected peers, should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ }
+ assertNull(rp.getPeerUUID(ID_ONE));
+ assertEquals(KEY_ONE, ZKUtil.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE)));
+ rp.disconnectFromPeer(ID_ONE);
+ assertEquals(0, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
+
+ // Connect to one peer
+ rp.connectToPeer(ID_ONE);
+ assertNumberOfPeers(1, 2);
+ assertTrue(rp.getStatusOfConnectedPeer(ID_ONE));
+ rp.disablePeer(ID_ONE);
+ assertConnectedPeerStatus(false, ID_ONE);
+ rp.enablePeer(ID_ONE);
+ assertConnectedPeerStatus(true, ID_ONE);
+ assertEquals(1, rp.getRegionServersOfConnectedPeer(ID_ONE).size());
+ assertNotNull(rp.getPeerUUID(ID_ONE).toString());
+
+ // Disconnect peer
+ rp.disconnectFromPeer(ID_ONE);
+ assertNumberOfPeers(0, 2);
+ try {
+ rp.getStatusOfConnectedPeer(ID_ONE);
+ fail("There are no connected peers, should have thrown an IllegalArgumentException");
+ } catch (IllegalArgumentException e) {
+ }
+ }
+
+ protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
+ while (true) {
+ if (status == rp.getStatusOfConnectedPeer(peerId)) {
+ return;
+ }
+ if (zkTimeoutCount < ZK_MAX_COUNT) {
+ LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+ + ", sleeping and trying again.");
+ Thread.sleep(ZK_SLEEP_INTERVAL);
+ } else {
+ fail("Timed out waiting for ConnectedPeerStatus to be " + status);
+ }
+ }
+ }
+
+ protected void assertNumberOfPeers(int connected, int total) {
+ assertEquals(total, rp.getAllPeerClusterKeys().size());
+ assertEquals(connected, rp.getConnectedPeers().size());
+ assertEquals(total, rp.getAllPeerIds().size());
+ }
+
/*
* three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
* 3, 4, 5 log files respectively
*/
- protected void populateQueues() throws KeeperException {
+ protected void populateQueues() throws KeeperException, IOException {
rq1.addLog("trash", "trash");
rq1.removeQueue("trash");
@@ -147,6 +265,8 @@ public abstract class TestReplicationSta
for (int j = 0; j < i; j++) {
rq3.addLog("qId" + i, "filename" + j);
}
+ //Add peers for the corresponding queues so they are not orphans
+ rp.addPeer("qId" + i, "bogus" + i);
}
}
}