You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2017/08/23 21:42:45 UTC
[4/4] hbase git commit: HBASE-17442 Move most of the replication
related classes from hbase-client to new hbase-replication package. (Guanghao
Zhang).
HBASE-17442 Move most of the replication related classes from hbase-client to new hbase-replication package. (Guanghao Zhang).
Change-Id: Ie0e24cc617ab4bf56de8b1747062d1b78a5d4669
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/26e6c2ce
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/26e6c2ce
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/26e6c2ce
Branch: refs/heads/master
Commit: 26e6c2ceb4db80e561ec321dba52673e7f285d1b
Parents: ae052e4
Author: Apekshit Sharma <ap...@apache.org>
Authored: Thu Aug 17 20:59:35 2017 -0700
Committer: Apekshit Sharma <ap...@apache.org>
Committed: Wed Aug 23 14:41:58 2017 -0700
----------------------------------------------------------------------
.../client/replication/ReplicationAdmin.java | 102 +---
.../hbase/replication/ReplicationFactory.java | 66 ---
.../hbase/replication/ReplicationListener.java | 51 --
.../hbase/replication/ReplicationPeer.java | 89 ---
.../ReplicationPeerConfigListener.java | 33 --
.../replication/ReplicationPeerZKImpl.java | 318 -----------
.../hbase/replication/ReplicationPeers.java | 177 ------
.../replication/ReplicationPeersZKImpl.java | 546 -------------------
.../hbase/replication/ReplicationQueueInfo.java | 130 -----
.../hbase/replication/ReplicationQueues.java | 160 ------
.../replication/ReplicationQueuesArguments.java | 70 ---
.../replication/ReplicationQueuesClient.java | 93 ----
.../ReplicationQueuesClientArguments.java | 40 --
.../ReplicationQueuesClientZKImpl.java | 175 ------
.../replication/ReplicationQueuesZKImpl.java | 407 --------------
.../replication/ReplicationStateZKBase.java | 155 ------
.../hbase/replication/ReplicationTableBase.java | 441 ---------------
.../hbase/replication/ReplicationTracker.java | 49 --
.../replication/ReplicationTrackerZKImpl.java | 250 ---------
.../TableBasedReplicationQueuesClientImpl.java | 112 ----
.../TableBasedReplicationQueuesImpl.java | 450 ---------------
.../apache/hadoop/hbase/zookeeper/ZKUtil.java | 14 +-
.../hadoop/hbase/zookeeper/ZNodePaths.java | 22 +-
hbase-replication/pom.xml | 264 +++++++++
.../hbase/replication/ReplicationFactory.java | 66 +++
.../hbase/replication/ReplicationListener.java | 51 ++
.../hbase/replication/ReplicationPeer.java | 89 +++
.../ReplicationPeerConfigListener.java | 33 ++
.../replication/ReplicationPeerZKImpl.java | 318 +++++++++++
.../hbase/replication/ReplicationPeers.java | 177 ++++++
.../replication/ReplicationPeersZKImpl.java | 546 +++++++++++++++++++
.../hbase/replication/ReplicationQueueInfo.java | 130 +++++
.../hbase/replication/ReplicationQueues.java | 160 ++++++
.../replication/ReplicationQueuesArguments.java | 70 +++
.../replication/ReplicationQueuesClient.java | 93 ++++
.../ReplicationQueuesClientArguments.java | 40 ++
.../ReplicationQueuesClientZKImpl.java | 175 ++++++
.../replication/ReplicationQueuesZKImpl.java | 407 ++++++++++++++
.../replication/ReplicationStateZKBase.java | 155 ++++++
.../hbase/replication/ReplicationTableBase.java | 441 +++++++++++++++
.../hbase/replication/ReplicationTracker.java | 49 ++
.../replication/ReplicationTrackerZKImpl.java | 250 +++++++++
.../TableBasedReplicationQueuesClientImpl.java | 112 ++++
.../TableBasedReplicationQueuesImpl.java | 450 +++++++++++++++
hbase-server/pom.xml | 4 +
.../replication/BaseReplicationEndpoint.java | 2 -
pom.xml | 6 +
47 files changed, 4113 insertions(+), 3925 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
index 752d18c..615a79d 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/replication/ReplicationAdmin.java
@@ -26,37 +26,22 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.ReplicationPeerNotFoundException;
import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.HBaseAdmin;
-import org.apache.hadoop.hbase.client.RegionLocator;
import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationPeerZKImpl;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.com.google.common.collect.Lists;
@@ -101,16 +86,6 @@ public class ReplicationAdmin implements Closeable {
Integer.toString(HConstants.REPLICATION_SCOPE_SERIAL);
private final Connection connection;
- // TODO: replication should be managed by master. All the classes except ReplicationAdmin should
- // be moved to hbase-server. Resolve it in HBASE-11392.
- private final ReplicationQueuesClient replicationQueuesClient;
- private final ReplicationPeers replicationPeers;
- /**
- * A watcher used by replicationPeers and replicationQueuesClient. Keep reference so can dispose
- * on {@link #close()}.
- */
- private final ZooKeeperWatcher zkw;
-
private Admin admin;
/**
@@ -122,49 +97,6 @@ public class ReplicationAdmin implements Closeable {
public ReplicationAdmin(Configuration conf) throws IOException {
this.connection = ConnectionFactory.createConnection(conf);
admin = connection.getAdmin();
- try {
- zkw = createZooKeeperWatcher();
- try {
- this.replicationQueuesClient =
- ReplicationFactory.getReplicationQueuesClient(new ReplicationQueuesClientArguments(conf,
- this.connection, zkw));
- this.replicationQueuesClient.init();
- this.replicationPeers = ReplicationFactory.getReplicationPeers(zkw, conf,
- this.replicationQueuesClient, this.connection);
- this.replicationPeers.init();
- } catch (Exception exception) {
- if (zkw != null) {
- zkw.close();
- }
- throw exception;
- }
- } catch (Exception exception) {
- connection.close();
- if (exception instanceof IOException) {
- throw (IOException) exception;
- } else if (exception instanceof RuntimeException) {
- throw (RuntimeException) exception;
- } else {
- throw new IOException("Error initializing the replication admin client.", exception);
- }
- }
- }
-
- private ZooKeeperWatcher createZooKeeperWatcher() throws IOException {
- // This Abortable doesn't 'abort'... it just logs.
- return new ZooKeeperWatcher(connection.getConfiguration(), "ReplicationAdmin", new Abortable() {
- @Override
- public void abort(String why, Throwable e) {
- LOG.error(why, e);
- // We used to call system.exit here but this script can be embedded by other programs that
- // want to do replication stuff... so inappropriate calling System.exit. Just log for now.
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
- });
}
/**
@@ -452,9 +384,6 @@ public class ReplicationAdmin implements Closeable {
@Override
public void close() throws IOException {
- if (this.zkw != null) {
- this.zkw.close();
- }
if (this.connection != null) {
this.connection.close();
}
@@ -518,40 +447,13 @@ public class ReplicationAdmin implements Closeable {
admin.disableTableReplication(tableName);
}
- @VisibleForTesting
- @Deprecated
- public void peerAdded(String id) throws ReplicationException {
- this.replicationPeers.peerConnected(id);
- }
-
/**
* @deprecated use {@link org.apache.hadoop.hbase.client.Admin#listReplicationPeers()} instead
*/
@VisibleForTesting
@Deprecated
- List<ReplicationPeer> listReplicationPeers() throws IOException {
- Map<String, ReplicationPeerConfig> peers = listPeerConfigs();
- if (peers == null || peers.size() <= 0) {
- return null;
- }
- List<ReplicationPeer> listOfPeers = new ArrayList<>(peers.size());
- for (Entry<String, ReplicationPeerConfig> peerEntry : peers.entrySet()) {
- String peerId = peerEntry.getKey();
- try {
- Pair<ReplicationPeerConfig, Configuration> pair = this.replicationPeers.getPeerConf(peerId);
- Configuration peerConf = pair.getSecond();
- ReplicationPeer peer = new ReplicationPeerZKImpl(zkw, pair.getSecond(),
- peerId, pair.getFirst(), this.connection);
- listOfPeers.add(peer);
- } catch (ReplicationException e) {
- LOG.warn("Failed to get valid replication peers. "
- + "Error connecting to peer cluster with peerId=" + peerId + ". Error message="
- + e.getMessage());
- LOG.debug("Failure details to get valid replication peers.", e);
- continue;
- }
- }
- return listOfPeers;
+ List<ReplicationPeerDescription> listReplicationPeers() throws IOException {
+ return admin.listReplicationPeers();
}
/**
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
deleted file mode 100644
index 8506cbb..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.commons.lang.reflect.ConstructorUtils;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.Stoppable;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * A factory class for instantiating replication objects that deal with replication state.
- */
-@InterfaceAudience.Private
-public class ReplicationFactory {
-
- public static final Class defaultReplicationQueueClass = ReplicationQueuesZKImpl.class;
-
- public static ReplicationQueues getReplicationQueues(ReplicationQueuesArguments args)
- throws Exception {
- Class<?> classToBuild = args.getConf().getClass("hbase.region.replica." +
- "replication.replicationQueues.class", defaultReplicationQueueClass);
- return (ReplicationQueues) ConstructorUtils.invokeConstructor(classToBuild, args);
- }
-
- public static ReplicationQueuesClient getReplicationQueuesClient(
- ReplicationQueuesClientArguments args) throws Exception {
- Class<?> classToBuild = args.getConf().getClass(
- "hbase.region.replica.replication.replicationQueuesClient.class",
- ReplicationQueuesClientZKImpl.class);
- return (ReplicationQueuesClient) ConstructorUtils.invokeConstructor(classToBuild, args);
- }
-
- public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
- Abortable abortable) {
- return getReplicationPeers(zk, conf, null, abortable);
- }
-
- public static ReplicationPeers getReplicationPeers(final ZooKeeperWatcher zk, Configuration conf,
- final ReplicationQueuesClient queuesClient, Abortable abortable) {
- return new ReplicationPeersZKImpl(zk, conf, queuesClient, abortable);
- }
-
- public static ReplicationTracker getReplicationTracker(ZooKeeperWatcher zookeeper,
- final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
- Stoppable stopper) {
- return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
deleted file mode 100644
index dfb5fdc..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationListener.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-/**
- * The replication listener interface can be implemented if a class needs to subscribe to events
- * generated by the ReplicationTracker. These events include things like addition/deletion of peer
- * clusters or failure of a local region server. To receive events, the class also needs to register
- * itself with a Replication Tracker.
- */
-@InterfaceAudience.Private
-public interface ReplicationListener {
-
- /**
- * A region server has been removed from the local cluster
- * @param regionServer the removed region server
- */
- public void regionServerRemoved(String regionServer);
-
- /**
- * A peer cluster has been removed (i.e. unregistered) from replication.
- * @param peerId The peer id of the cluster that has been removed
- */
- public void peerRemoved(String peerId);
-
- /**
- * The list of registered peer clusters has changed.
- * @param peerIds A list of all currently registered peer clusters
- */
- public void peerListChanged(List<String> peerIds);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
deleted file mode 100644
index 4f18048..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeer.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-
-/**
- * ReplicationPeer manages enabled / disabled state for the peer.
- */
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public interface ReplicationPeer {
-
- /**
- * State of the peer, whether it is enabled or not
- */
- @InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
- enum PeerState {
- ENABLED,
- DISABLED
- }
-
- /**
- * Get the identifier of this peer
- * @return string representation of the id
- */
- String getId();
-
- /**
- * Get the peer config object
- * @return the ReplicationPeerConfig for this peer
- */
- public ReplicationPeerConfig getPeerConfig();
-
- /**
- * Returns the state of the peer
- * @return the enabled state
- */
- PeerState getPeerState();
-
- /**
- * Get the configuration object required to communicate with this peer
- * @return configuration object
- */
- public Configuration getConfiguration();
-
- /**
- * Get replicable (table, cf-list) map of this peer
- * @return the replicable (table, cf-list) map
- */
- public Map<TableName, List<String>> getTableCFs();
-
- /**
- * Get replicable namespace set of this peer
- * @return the replicable namespaces set
- */
- public Set<String> getNamespaces();
-
- /**
- * Get the per node bandwidth upper limit for this peer
- * @return the bandwidth upper limit
- */
- public long getPeerBandwidth();
-
- void trackPeerConfigChanges(ReplicationPeerConfigListener listener);
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
deleted file mode 100644
index 4e04186..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerConfigListener.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.hbase.HBaseInterfaceAudience;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-
-@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.REPLICATION)
-public interface ReplicationPeerConfigListener {
- /** Callback method for when users update the ReplicationPeerConfig for this peer
- *
- * @param rpc The updated ReplicationPeerConfig
- */
- void peerConfigUpdated(ReplicationPeerConfig rpc);
-
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
deleted file mode 100644
index 3973be9..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerZKImpl.java
+++ /dev/null
@@ -1,318 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.Closeable;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-import org.apache.zookeeper.KeeperException.NodeExistsException;
-
-@InterfaceAudience.Private
-public class ReplicationPeerZKImpl extends ReplicationStateZKBase
- implements ReplicationPeer, Abortable, Closeable {
- private static final Log LOG = LogFactory.getLog(ReplicationPeerZKImpl.class);
-
- private ReplicationPeerConfig peerConfig;
- private final String id;
- private volatile PeerState peerState;
- private volatile Map<TableName, List<String>> tableCFs = new HashMap<>();
- private final Configuration conf;
- private PeerStateTracker peerStateTracker;
- private PeerConfigTracker peerConfigTracker;
-
-
- /**
- * Constructor that takes all the objects required to communicate with the specified peer, except
- * for the region server addresses.
- * @param conf configuration object to this peer
- * @param id string representation of this peer's identifier
- * @param peerConfig configuration for the replication peer
- */
- public ReplicationPeerZKImpl(ZooKeeperWatcher zkWatcher, Configuration conf,
- String id, ReplicationPeerConfig peerConfig,
- Abortable abortable)
- throws ReplicationException {
- super(zkWatcher, conf, abortable);
- this.conf = conf;
- this.peerConfig = peerConfig;
- this.id = id;
- }
-
- /**
- * start a state tracker to check whether this peer is enabled or not
- *
- * @param peerStateNode path to zk node which stores peer state
- * @throws KeeperException
- */
- public void startStateTracker(String peerStateNode)
- throws KeeperException {
- ensurePeerEnabled(peerStateNode);
- this.peerStateTracker = new PeerStateTracker(peerStateNode, zookeeper, this);
- this.peerStateTracker.start();
- try {
- this.readPeerStateZnode();
- } catch (DeserializationException e) {
- throw ZKUtil.convert(e);
- }
- }
-
- private void readPeerStateZnode() throws DeserializationException {
- this.peerState =
- isStateEnabled(this.peerStateTracker.getData(false))
- ? PeerState.ENABLED
- : PeerState.DISABLED;
- }
-
- /**
- * start a table-cfs tracker to listen the (table, cf-list) map change
- * @param peerConfigNode path to zk node which stores table-cfs
- * @throws KeeperException
- */
- public void startPeerConfigTracker(String peerConfigNode)
- throws KeeperException {
- this.peerConfigTracker = new PeerConfigTracker(peerConfigNode, zookeeper,
- this);
- this.peerConfigTracker.start();
- this.readPeerConfig();
- }
-
- private ReplicationPeerConfig readPeerConfig() {
- try {
- byte[] data = peerConfigTracker.getData(false);
- if (data != null) {
- this.peerConfig = ReplicationSerDeHelper.parsePeerFrom(data);
- }
- } catch (DeserializationException e) {
- LOG.error("", e);
- }
- return this.peerConfig;
- }
-
- @Override
- public PeerState getPeerState() {
- return peerState;
- }
-
- /**
- * Get the identifier of this peer
- * @return string representation of the id (short)
- */
- @Override
- public String getId() {
- return id;
- }
-
- /**
- * Get the peer config object
- * @return the ReplicationPeerConfig for this peer
- */
- @Override
- public ReplicationPeerConfig getPeerConfig() {
- return peerConfig;
- }
-
- /**
- * Get the configuration object required to communicate with this peer
- * @return configuration object
- */
- @Override
- public Configuration getConfiguration() {
- return conf;
- }
-
- /**
- * Get replicable (table, cf-list) map of this peer
- * @return the replicable (table, cf-list) map
- */
- @Override
- public Map<TableName, List<String>> getTableCFs() {
- this.tableCFs = peerConfig.getTableCFsMap();
- return this.tableCFs;
- }
-
- /**
- * Get replicable namespace set of this peer
- * @return the replicable namespaces set
- */
- @Override
- public Set<String> getNamespaces() {
- return this.peerConfig.getNamespaces();
- }
-
- @Override
- public long getPeerBandwidth() {
- return this.peerConfig.getBandwidth();
- }
-
- @Override
- public void trackPeerConfigChanges(ReplicationPeerConfigListener listener) {
- if (this.peerConfigTracker != null){
- this.peerConfigTracker.setListener(listener);
- }
- }
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.fatal("The ReplicationPeer corresponding to peer " + peerConfig
- + " was aborted for the following reason(s):" + why, e);
- }
-
- @Override
- public boolean isAborted() {
- // Currently the replication peer is never "Aborted", we just log when the
- // abort method is called.
- return false;
- }
-
- @Override
- public void close() throws IOException {
- // TODO: stop zkw?
- }
-
- /**
- * Parse the raw data from ZK to get a peer's state
- * @param bytes raw ZK data
- * @return True if the passed in <code>bytes</code> are those of a pb serialized ENABLED state.
- * @throws DeserializationException
- */
- public static boolean isStateEnabled(final byte[] bytes) throws DeserializationException {
- ReplicationProtos.ReplicationState.State state = parseStateFrom(bytes);
- return ReplicationProtos.ReplicationState.State.ENABLED == state;
- }
-
- /**
- * @param bytes Content of a state znode.
- * @return State parsed from the passed bytes.
- * @throws DeserializationException
- */
- private static ReplicationProtos.ReplicationState.State parseStateFrom(final byte[] bytes)
- throws DeserializationException {
- ProtobufUtil.expectPBMagicPrefix(bytes);
- int pblen = ProtobufUtil.lengthOfPBMagic();
- ReplicationProtos.ReplicationState.Builder builder =
- ReplicationProtos.ReplicationState.newBuilder();
- ReplicationProtos.ReplicationState state;
- try {
- ProtobufUtil.mergeFrom(builder, bytes, pblen, bytes.length - pblen);
- state = builder.build();
- return state.getState();
- } catch (IOException e) {
- throw new DeserializationException(e);
- }
- }
-
- /**
- * Utility method to ensure an ENABLED znode is in place; if not present, we create it.
- * @param path Path to znode to check
- * @return True if we created the znode.
- * @throws NodeExistsException
- * @throws KeeperException
- */
- private boolean ensurePeerEnabled(final String path)
- throws NodeExistsException, KeeperException {
- if (ZKUtil.checkExists(zookeeper, path) == -1) {
- // There is a race b/w PeerWatcher and ReplicationZookeeper#add method to create the
- // peer-state znode. This happens while adding a peer.
- // The peer state data is set as "ENABLED" by default.
- ZKUtil.createNodeIfNotExistsAndWatch(zookeeper, path,
- ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
- return true;
- }
- return false;
- }
-
- /**
- * Tracker for state of this peer
- */
- public class PeerStateTracker extends ZooKeeperNodeTracker {
-
- public PeerStateTracker(String peerStateZNode, ZooKeeperWatcher watcher,
- Abortable abortable) {
- super(watcher, peerStateZNode, abortable);
- }
-
- @Override
- public synchronized void nodeDataChanged(String path) {
- if (path.equals(node)) {
- super.nodeDataChanged(path);
- try {
- readPeerStateZnode();
- } catch (DeserializationException e) {
- LOG.warn("Failed deserializing the content of " + path, e);
- }
- }
- }
- }
-
- /**
- * Tracker for PeerConfigNode of this peer
- */
- public class PeerConfigTracker extends ZooKeeperNodeTracker {
-
- ReplicationPeerConfigListener listener;
-
- public PeerConfigTracker(String peerConfigNode, ZooKeeperWatcher watcher,
- Abortable abortable) {
- super(watcher, peerConfigNode, abortable);
- }
-
- public synchronized void setListener(ReplicationPeerConfigListener listener){
- this.listener = listener;
- }
-
- @Override
- public synchronized void nodeCreated(String path) {
- if (path.equals(node)) {
- super.nodeCreated(path);
- ReplicationPeerConfig config = readPeerConfig();
- if (listener != null){
- listener.peerConfigUpdated(config);
- }
- }
- }
-
- @Override
- public synchronized void nodeDataChanged(String path) {
- //superclass calls nodeCreated
- if (path.equals(node)) {
- super.nodeDataChanged(path);
- }
-
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
deleted file mode 100644
index 2a7963a..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeers.java
+++ /dev/null
@@ -1,177 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * This provides an interface for maintaining a set of peer clusters. These peers are remote slave
- * clusters that data is replicated to. A peer cluster can be in three different states:
- *
- * 1. Not-Registered - There is no notion of the peer cluster.
- * 2. Registered - The peer has an id and is being tracked but there is no connection.
- * 3. Connected - There is an active connection to the remote peer.
- *
- * In the registered or connected state, a peer cluster can either be enabled or disabled.
- */
-@InterfaceAudience.Private
-public interface ReplicationPeers {
-
- /**
- * Initialize the ReplicationPeers interface.
- */
- void init() throws ReplicationException;
-
- /**
- * Add a new remote slave cluster for replication.
- * @param peerId a short that identifies the cluster
- * @param peerConfig configuration for the replication slave cluster
- */
- void registerPeer(String peerId, ReplicationPeerConfig peerConfig)
- throws ReplicationException;
-
- /**
- * Removes a remote slave cluster and stops the replication to it.
- * @param peerId a short that identifies the cluster
- */
- void unregisterPeer(String peerId) throws ReplicationException;
-
- /**
- * Method called after a peer has been connected. It will create a ReplicationPeer to track the
- * newly connected cluster.
- * @param peerId a short that identifies the cluster
- * @return whether a ReplicationPeer was successfully created
- * @throws ReplicationException
- */
- boolean peerConnected(String peerId) throws ReplicationException;
-
- /**
- * Method called after a peer has been disconnected. It will remove the ReplicationPeer that
- * tracked the disconnected cluster.
- * @param peerId a short that identifies the cluster
- */
- void peerDisconnected(String peerId);
-
- /**
- * Restart the replication to the specified remote slave cluster.
- * @param peerId a short that identifies the cluster
- */
- void enablePeer(String peerId) throws ReplicationException;
-
- /**
- * Stop the replication to the specified remote slave cluster.
- * @param peerId a short that identifies the cluster
- */
- void disablePeer(String peerId) throws ReplicationException;
-
- /**
- * Get the table and column-family list string of the peer from the underlying storage.
- * @param peerId a short that identifies the cluster
- */
- public Map<TableName, List<String>> getPeerTableCFsConfig(String peerId)
- throws ReplicationException;
-
- /**
- * Set the table and column-family list string of the peer to the underlying storage.
- * @param peerId a short that identifies the cluster
- * @param tableCFs the table and column-family list which will be replicated for this peer
- */
- public void setPeerTableCFsConfig(String peerId,
- Map<TableName, ? extends Collection<String>> tableCFs)
- throws ReplicationException;
-
- /**
- * Returns the ReplicationPeer for the specified connected peer. This ReplicationPeer will
- * continue to track changes to the Peer's state and config. This method returns null if no
- * peer has been connected with the given peerId.
- * @param peerId id for the peer
- * @return ReplicationPeer object
- */
- ReplicationPeer getConnectedPeer(String peerId);
-
- /**
- * Returns the set of peerIds of the clusters that have been connected and have an underlying
- * ReplicationPeer.
- * @return a Set of Strings for peerIds
- */
- public Set<String> getConnectedPeerIds();
-
- /**
- * Get the replication status for the specified connected remote slave cluster.
- * The value might be read from cache, so it is recommended to
- * use {@link #getStatusOfPeerFromBackingStore(String)}
- * if reading the state after enabling or disabling it.
- * @param peerId a short that identifies the cluster
- * @return true if replication is enabled, false otherwise.
- */
- boolean getStatusOfPeer(String peerId);
-
- /**
- * Get the replication status for the specified remote slave cluster, which doesn't
- * have to be connected. The state is read directly from the backing store.
- * @param peerId a short that identifies the cluster
- * @return true if replication is enabled, false otherwise.
- * @throws ReplicationException thrown if there's an error contacting the store
- */
- boolean getStatusOfPeerFromBackingStore(String peerId) throws ReplicationException;
-
- /**
- * List the cluster replication configs of all remote slave clusters (whether they are
- * enabled/disabled or connected/disconnected).
- * @return A map of peer ids to peer cluster keys
- */
- Map<String, ReplicationPeerConfig> getAllPeerConfigs();
-
- /**
- * List the peer ids of all remote slave clusters (whether they are enabled/disabled or
- * connected/disconnected).
- * @return A list of peer ids
- */
- List<String> getAllPeerIds();
-
- /**
- * Returns the configured ReplicationPeerConfig for this peerId
- * @param peerId a short name that identifies the cluster
- * @return ReplicationPeerConfig for the peer
- */
- ReplicationPeerConfig getReplicationPeerConfig(String peerId) throws ReplicationException;
-
- /**
- * Returns the configuration needed to talk to the remote slave cluster.
- * @param peerId a short that identifies the cluster
- * @return the configuration for the peer cluster, null if it was unable to get the configuration
- */
- Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId) throws ReplicationException;
-
- /**
- * Update the peerConfig for the a given peer cluster
- * @param id a short that identifies the cluster
- * @param peerConfig new config for the peer cluster
- * @throws ReplicationException
- */
- void updatePeerConfig(String id, ReplicationPeerConfig peerConfig) throws ReplicationException;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
deleted file mode 100644
index 751e454..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeersZKImpl.java
+++ /dev/null
@@ -1,546 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.CompoundConfiguration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.client.replication.ReplicationSerDeHelper;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This class provides an implementation of the ReplicationPeers interface using ZooKeeper. The
- * peers znode contains a list of all peer replication clusters and the current replication state of
- * those clusters. It has one child peer znode for each peer cluster. The peer znode is named with
- * the cluster id provided by the user in the HBase shell. The value of the peer znode contains the
- * peers cluster key provided by the user in the HBase Shell. The cluster key contains a list of
- * zookeeper quorum peers, the client port for the zookeeper quorum, and the base znode for HBase.
- * For example:
- *
- * /hbase/replication/peers/1 [Value: zk1.host.com,zk2.host.com,zk3.host.com:2181:/hbase]
- * /hbase/replication/peers/2 [Value: zk5.host.com,zk6.host.com,zk7.host.com:2181:/hbase]
- *
- * Each of these peer znodes has a child znode that indicates whether or not replication is enabled
- * on that peer cluster. These peer-state znodes do not have child znodes and simply contain a
- * boolean value (i.e. ENABLED or DISABLED). This value is read/maintained by the
- * ReplicationPeer.PeerStateTracker class. For example:
- *
- * /hbase/replication/peers/1/peer-state [Value: ENABLED]
- *
- * Each of these peer znodes has a child znode that indicates which data will be replicated
- * to the peer cluster. These peer-tableCFs znodes do not have child znodes and only have a
- * table/cf list config. This value is read/maintained by the ReplicationPeer.TableCFsTracker
- * class. For example:
- *
- * /hbase/replication/peers/1/tableCFs [Value: "table1; table2:cf1,cf3; table3:cfx,cfy"]
- */
-@InterfaceAudience.Private
-public class ReplicationPeersZKImpl extends ReplicationStateZKBase implements ReplicationPeers {
-
- // Map of peer clusters keyed by their id
- private Map<String, ReplicationPeerZKImpl> peerClusters;
- private final ReplicationQueuesClient queuesClient;
- private Abortable abortable;
-
- private static final Log LOG = LogFactory.getLog(ReplicationPeersZKImpl.class);
-
- public ReplicationPeersZKImpl(final ZooKeeperWatcher zk, final Configuration conf,
- final ReplicationQueuesClient queuesClient, Abortable abortable) {
- super(zk, conf, abortable);
- this.abortable = abortable;
- this.peerClusters = new ConcurrentHashMap<>();
- this.queuesClient = queuesClient;
- }
-
- @Override
- public void init() throws ReplicationException {
- try {
- if (ZKUtil.checkExists(this.zookeeper, this.peersZNode) < 0) {
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Could not initialize replication peers", e);
- }
- addExistingPeers();
- }
-
- @Override
- public void registerPeer(String id, ReplicationPeerConfig peerConfig)
- throws ReplicationException {
- try {
- if (peerExists(id)) {
- throw new IllegalArgumentException("Cannot add a peer with id=" + id
- + " because that id already exists.");
- }
-
- if(id.contains("-")){
- throw new IllegalArgumentException("Found invalid peer name:" + id);
- }
-
- if (peerConfig.getClusterKey() != null) {
- try {
- ZKConfig.validateClusterKey(peerConfig.getClusterKey());
- } catch (IOException ioe) {
- throw new IllegalArgumentException(ioe.getMessage());
- }
- }
-
- checkQueuesDeleted(id);
-
- ZKUtil.createWithParents(this.zookeeper, this.peersZNode);
-
- List<ZKUtilOp> listOfOps = new ArrayList<>(2);
- ZKUtilOp op1 = ZKUtilOp.createAndFailSilent(getPeerNode(id),
- ReplicationSerDeHelper.toByteArray(peerConfig));
- // b/w PeerWatcher and ReplicationZookeeper#add method to create the
- // peer-state znode. This happens while adding a peer
- // The peer state data is set as "ENABLED" by default.
- ZKUtilOp op2 = ZKUtilOp.createAndFailSilent(getPeerStateNode(id), ENABLED_ZNODE_BYTES);
- listOfOps.add(op1);
- listOfOps.add(op2);
- ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
- // A peer is enabled by default
- } catch (KeeperException e) {
- throw new ReplicationException("Could not add peer with id=" + id
- + ", peerConfif=>" + peerConfig, e);
- }
- }
-
- @Override
- public void unregisterPeer(String id) throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot remove peer with id=" + id
- + " because that id does not exist.");
- }
- ZKUtil.deleteNodeRecursively(this.zookeeper, ZKUtil.joinZNode(this.peersZNode, id));
- } catch (KeeperException e) {
- throw new ReplicationException("Could not remove peer with id=" + id, e);
- }
- }
-
- @Override
- public void enablePeer(String id) throws ReplicationException {
- changePeerState(id, ReplicationProtos.ReplicationState.State.ENABLED);
- LOG.info("peer " + id + " is enabled");
- }
-
- @Override
- public void disablePeer(String id) throws ReplicationException {
- changePeerState(id, ReplicationProtos.ReplicationState.State.DISABLED);
- LOG.info("peer " + id + " is disabled");
- }
-
- @Override
- public Map<TableName, List<String>> getPeerTableCFsConfig(String id) throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("peer " + id + " doesn't exist");
- }
- try {
- ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
- if (rpc == null) {
- throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
- }
- return rpc.getTableCFsMap();
- } catch (Exception e) {
- throw new ReplicationException(e);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id, e);
- }
- }
-
- @Override
- public void setPeerTableCFsConfig(String id,
- Map<TableName, ? extends Collection<String>> tableCFs)
- throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot set peer tableCFs because id=" + id
- + " does not exist.");
- }
- ReplicationPeerConfig rpc = getReplicationPeerConfig(id);
- if (rpc == null) {
- throw new ReplicationException("Unable to get tableCFs of the peer with id=" + id);
- }
- rpc.setTableCFsMap(tableCFs);
- ZKUtil.setData(this.zookeeper, getPeerNode(id),
- ReplicationSerDeHelper.toByteArray(rpc));
- LOG.info("Peer tableCFs with id= " + id + " is now " +
- ReplicationSerDeHelper.convertToString(tableCFs));
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to change tableCFs of the peer with id=" + id, e);
- }
- }
-
- @Override
- public boolean getStatusOfPeer(String id) {
- ReplicationPeer replicationPeer = this.peerClusters.get(id);
- if (replicationPeer == null) {
- throw new IllegalArgumentException("Peer with id= " + id + " is not cached");
- }
- return replicationPeer.getPeerState() == PeerState.ENABLED;
- }
-
- @Override
- public boolean getStatusOfPeerFromBackingStore(String id) throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("peer " + id + " doesn't exist");
- }
- String peerStateZNode = getPeerStateNode(id);
- try {
- return ReplicationPeerZKImpl.isStateEnabled(ZKUtil.getData(this.zookeeper, peerStateZNode));
- } catch (KeeperException e) {
- throw new ReplicationException(e);
- } catch (DeserializationException e) {
- throw new ReplicationException(e);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to get status of the peer with id=" + id +
- " from backing store", e);
- } catch (InterruptedException e) {
- throw new ReplicationException(e);
- }
- }
-
- @Override
- public Map<String, ReplicationPeerConfig> getAllPeerConfigs() {
- Map<String, ReplicationPeerConfig> peers = new TreeMap<>();
- List<String> ids = null;
- try {
- ids = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- for (String id : ids) {
- ReplicationPeerConfig peerConfig = getReplicationPeerConfig(id);
- if (peerConfig == null) {
- LOG.warn("Failed to get replication peer configuration of clusterid=" + id
- + " znode content, continuing.");
- continue;
- }
- peers.put(id, peerConfig);
- }
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- } catch (ReplicationException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return peers;
- }
-
- @Override
- public ReplicationPeer getConnectedPeer(String peerId) {
- return peerClusters.get(peerId);
- }
-
- @Override
- public Set<String> getConnectedPeerIds() {
- return peerClusters.keySet(); // this is not thread-safe
- }
-
- /**
- * Returns a ReplicationPeerConfig from the znode or null for the given peerId.
- */
- @Override
- public ReplicationPeerConfig getReplicationPeerConfig(String peerId)
- throws ReplicationException {
- String znode = getPeerNode(peerId);
- byte[] data = null;
- try {
- data = ZKUtil.getData(this.zookeeper, znode);
- } catch (InterruptedException e) {
- LOG.warn("Could not get configuration for peer because the thread " +
- "was interrupted. peerId=" + peerId);
- Thread.currentThread().interrupt();
- return null;
- } catch (KeeperException e) {
- throw new ReplicationException("Error getting configuration for peer with id="
- + peerId, e);
- }
- if (data == null) {
- LOG.error("Could not get configuration for peer because it doesn't exist. peerId=" + peerId);
- return null;
- }
-
- try {
- return ReplicationSerDeHelper.parsePeerFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed to parse cluster key from peerId=" + peerId
- + ", specifically the content from the following znode: " + znode);
- return null;
- }
- }
-
- @Override
- public Pair<ReplicationPeerConfig, Configuration> getPeerConf(String peerId)
- throws ReplicationException {
- ReplicationPeerConfig peerConfig = getReplicationPeerConfig(peerId);
-
- if (peerConfig == null) {
- return null;
- }
-
- Configuration otherConf;
- try {
- otherConf = HBaseConfiguration.createClusterConf(this.conf, peerConfig.getClusterKey());
- } catch (IOException e) {
- LOG.error("Can't get peer configuration for peerId=" + peerId + " because:", e);
- return null;
- }
-
- if (!peerConfig.getConfiguration().isEmpty()) {
- CompoundConfiguration compound = new CompoundConfiguration();
- compound.add(otherConf);
- compound.addStringMap(peerConfig.getConfiguration());
- return new Pair<>(peerConfig, compound);
- }
-
- return new Pair<>(peerConfig, otherConf);
- }
-
- @Override
- public void updatePeerConfig(String id, ReplicationPeerConfig newConfig)
- throws ReplicationException {
- ReplicationPeer peer = getConnectedPeer(id);
- if (peer == null){
- throw new ReplicationException("Could not find peer Id " + id + " in connected peers");
- }
- ReplicationPeerConfig existingConfig = peer.getPeerConfig();
- if (newConfig.getClusterKey() != null && !newConfig.getClusterKey().isEmpty() &&
- !newConfig.getClusterKey().equals(existingConfig.getClusterKey())){
- throw new ReplicationException("Changing the cluster key on an existing peer is not allowed."
- + " Existing key '" + existingConfig.getClusterKey() + "' does not match new key '"
- + newConfig.getClusterKey() +
- "'");
- }
- String existingEndpointImpl = existingConfig.getReplicationEndpointImpl();
- if (newConfig.getReplicationEndpointImpl() != null &&
- !newConfig.getReplicationEndpointImpl().isEmpty() &&
- !newConfig.getReplicationEndpointImpl().equals(existingEndpointImpl)){
- throw new ReplicationException("Changing the replication endpoint implementation class " +
- "on an existing peer is not allowed. Existing class '"
- + existingConfig.getReplicationEndpointImpl()
- + "' does not match new class '" + newConfig.getReplicationEndpointImpl() + "'");
- }
- //Update existingConfig's peer config and peer data with the new values, but don't touch config
- // or data that weren't explicitly changed
- existingConfig.getConfiguration().putAll(newConfig.getConfiguration());
- existingConfig.getPeerData().putAll(newConfig.getPeerData());
- existingConfig.setTableCFsMap(newConfig.getTableCFsMap());
- existingConfig.setNamespaces(newConfig.getNamespaces());
- existingConfig.setBandwidth(newConfig.getBandwidth());
-
- try {
- ZKUtil.setData(this.zookeeper, getPeerNode(id),
- ReplicationSerDeHelper.toByteArray(existingConfig));
- }
- catch(KeeperException ke){
- throw new ReplicationException("There was a problem trying to save changes to the " +
- "replication peer " + id, ke);
- }
- }
-
- /**
- * List all registered peer clusters and set a watch on their znodes.
- */
- @Override
- public List<String> getAllPeerIds() {
- List<String> ids = null;
- try {
- ids = ZKUtil.listChildrenAndWatchThem(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- this.abortable.abort("Cannot get the list of peers ", e);
- }
- return ids;
- }
-
- /**
- * A private method used during initialization. This method attempts to add all registered
- * peer clusters. This method does not set a watch on the peer cluster znodes.
- */
- private void addExistingPeers() throws ReplicationException {
- List<String> znodes = null;
- try {
- znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- throw new ReplicationException("Error getting the list of peer clusters.", e);
- }
- if (znodes != null) {
- for (String z : znodes) {
- createAndAddPeer(z);
- }
- }
- }
-
- @Override
- public boolean peerConnected(String peerId) throws ReplicationException {
- return createAndAddPeer(peerId);
- }
-
- @Override
- public void peerDisconnected(String peerId) {
- ReplicationPeer rp = this.peerClusters.get(peerId);
- if (rp != null) {
- ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).remove(peerId, rp);
- }
- }
-
- /**
- * Attempt to connect to a new remote slave cluster.
- * @param peerId a short that identifies the cluster
- * @return true if a new connection was made, false if no new connection was made.
- */
- public boolean createAndAddPeer(String peerId) throws ReplicationException {
- if (peerClusters == null) {
- return false;
- }
- if (this.peerClusters.containsKey(peerId)) {
- return false;
- }
-
- ReplicationPeerZKImpl peer = null;
- try {
- peer = createPeer(peerId);
- } catch (Exception e) {
- throw new ReplicationException("Error adding peer with id=" + peerId, e);
- }
- if (peer == null) {
- return false;
- }
- ReplicationPeerZKImpl previous =
- ((ConcurrentMap<String, ReplicationPeerZKImpl>) peerClusters).putIfAbsent(peerId, peer);
- if (previous == null) {
- LOG.info("Added new peer cluster=" + peer.getPeerConfig().getClusterKey());
- } else {
- LOG.info("Peer already present, " + previous.getPeerConfig().getClusterKey() +
- ", new cluster=" + peer.getPeerConfig().getClusterKey());
- }
- return true;
- }
-
- /**
- * Update the state znode of a peer cluster.
- * @param id
- * @param state
- */
- private void changePeerState(String id, ReplicationProtos.ReplicationState.State state)
- throws ReplicationException {
- try {
- if (!peerExists(id)) {
- throw new IllegalArgumentException("Cannot enable/disable peer because id=" + id
- + " does not exist.");
- }
- String peerStateZNode = getPeerStateNode(id);
- byte[] stateBytes =
- (state == ReplicationProtos.ReplicationState.State.ENABLED) ? ENABLED_ZNODE_BYTES
- : DISABLED_ZNODE_BYTES;
- if (ZKUtil.checkExists(this.zookeeper, peerStateZNode) != -1) {
- ZKUtil.setData(this.zookeeper, peerStateZNode, stateBytes);
- } else {
- ZKUtil.createAndWatch(this.zookeeper, peerStateZNode, stateBytes);
- }
- LOG.info("Peer with id= " + id + " is now " + state.name());
- } catch (KeeperException e) {
- throw new ReplicationException("Unable to change state of the peer with id=" + id, e);
- }
- }
-
- /**
- * Helper method to connect to a peer
- * @param peerId peer's identifier
- * @return object representing the peer
- * @throws ReplicationException
- */
- private ReplicationPeerZKImpl createPeer(String peerId) throws ReplicationException {
- Pair<ReplicationPeerConfig, Configuration> pair = getPeerConf(peerId);
- if (pair == null) {
- return null;
- }
- Configuration peerConf = pair.getSecond();
-
- ReplicationPeerZKImpl peer = new ReplicationPeerZKImpl(zookeeper,
- peerConf, peerId, pair.getFirst(), abortable);
- try {
- peer.startStateTracker(this.getPeerStateNode(peerId));
- } catch (KeeperException e) {
- throw new ReplicationException("Error starting the peer state tracker for peerId=" +
- peerId, e);
- }
-
- try {
- peer.startPeerConfigTracker(this.getPeerNode(peerId));
- } catch (KeeperException e) {
- throw new ReplicationException("Error starting the peer tableCFs tracker for peerId=" +
- peerId, e);
- }
-
- return peer;
- }
-
- private void checkQueuesDeleted(String peerId) throws ReplicationException {
- if (queuesClient == null) return;
- try {
- List<String> replicators = queuesClient.getListOfReplicators();
- if (replicators == null || replicators.isEmpty()) {
- return;
- }
- for (String replicator : replicators) {
- List<String> queueIds = queuesClient.getAllQueues(replicator);
- for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- if (queueInfo.getPeerId().equals(peerId)) {
- throw new ReplicationException("undeleted queue for peerId: " + peerId
- + ", replicator: " + replicator + ", queueId: " + queueId);
- }
- }
- }
- // Check for hfile-refs queue
- if (-1 != ZKUtil.checkExists(zookeeper, hfileRefsZNode)
- && queuesClient.getAllPeersFromHFileRefsQueue().contains(peerId)) {
- throw new ReplicationException("Undeleted queue for peerId: " + peerId
- + ", found in hfile-refs node path " + hfileRefsZNode);
- }
- } catch (KeeperException e) {
- throw new ReplicationException("Could not check queues deleted with id=" + peerId, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
deleted file mode 100644
index 1403f6d..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueueInfo.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.ServerName;
-
-/**
- * This class is responsible for the parsing logic for a znode representing a queue.
- * It will extract the peerId if it's recovered as well as the dead region servers
- * that were part of the queue's history.
- */
-@InterfaceAudience.Private
-public class ReplicationQueueInfo {
- private static final Log LOG = LogFactory.getLog(ReplicationQueueInfo.class);
-
- private final String peerId;
- private final String peerClusterZnode;
- private boolean queueRecovered;
- // List of all the dead region servers that had this queue (if recovered)
- private List<String> deadRegionServers = new ArrayList<>();
-
- /**
- * The passed znode will be either the id of the peer cluster or
- * the handling story of that queue in the form of id-servername-*
- */
- public ReplicationQueueInfo(String znode) {
- this.peerClusterZnode = znode;
- String[] parts = znode.split("-", 2);
- this.queueRecovered = parts.length != 1;
- this.peerId = this.queueRecovered ?
- parts[0] : peerClusterZnode;
- if (parts.length >= 2) {
- // extract dead servers
- extractDeadServersFromZNodeString(parts[1], this.deadRegionServers);
- }
- }
-
- /**
- * Parse dead server names from znode string servername can contain "-" such as
- * "ip-10-46-221-101.ec2.internal", so we need skip some "-" during parsing for the following
- * cases: 2-ip-10-46-221-101.ec2.internal,52170,1364333181125-<server name>-...
- */
- private static void
- extractDeadServersFromZNodeString(String deadServerListStr, List<String> result) {
-
- if(deadServerListStr == null || result == null || deadServerListStr.isEmpty()) return;
-
- // valid server name delimiter "-" has to be after "," in a server name
- int seenCommaCnt = 0;
- int startIndex = 0;
- int len = deadServerListStr.length();
-
- for (int i = 0; i < len; i++) {
- switch (deadServerListStr.charAt(i)) {
- case ',':
- seenCommaCnt += 1;
- break;
- case '-':
- if(seenCommaCnt>=2) {
- if (i > startIndex) {
- String serverName = deadServerListStr.substring(startIndex, i);
- if(ServerName.isFullServerName(serverName)){
- result.add(serverName);
- } else {
- LOG.error("Found invalid server name:" + serverName);
- }
- startIndex = i + 1;
- }
- seenCommaCnt = 0;
- }
- break;
- default:
- break;
- }
- }
-
- // add tail
- if(startIndex < len - 1){
- String serverName = deadServerListStr.substring(startIndex, len);
- if(ServerName.isFullServerName(serverName)){
- result.add(serverName);
- } else {
- LOG.error("Found invalid server name at the end:" + serverName);
- }
- }
-
- LOG.debug("Found dead servers:" + result);
- }
-
- public List<String> getDeadRegionServers() {
- return Collections.unmodifiableList(this.deadRegionServers);
- }
-
- public String getPeerId() {
- return this.peerId;
- }
-
- public String getPeerClusterZnode() {
- return this.peerClusterZnode;
- }
-
- public boolean isQueueRecovered() {
- return queueRecovered;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
deleted file mode 100644
index be5a590..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.SortedSet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.util.Pair;
-
-/**
- * This provides an interface for maintaining a region server's replication queues. These queues
- * keep track of the WALs and HFile references (if hbase.replication.bulkload.enabled is enabled)
- * that still need to be replicated to remote clusters.
- */
-@InterfaceAudience.Private
-public interface ReplicationQueues {
-
- /**
- * Initialize the region server replication queue interface.
- * @param serverName The server name of the region server that owns the replication queues this
- * interface manages.
- */
- void init(String serverName) throws ReplicationException;
-
- /**
- * Remove a replication queue.
- * @param queueId a String that identifies the queue.
- */
- void removeQueue(String queueId);
-
- /**
- * Add a new WAL file to the given queue. If the queue does not exist it is created.
- * @param queueId a String that identifies the queue.
- * @param filename name of the WAL
- */
- void addLog(String queueId, String filename) throws ReplicationException;
-
- /**
- * Remove an WAL file from the given queue.
- * @param queueId a String that identifies the queue.
- * @param filename name of the WAL
- */
- void removeLog(String queueId, String filename);
-
- /**
- * Set the current position for a specific WAL in a given queue.
- * @param queueId a String that identifies the queue
- * @param filename name of the WAL
- * @param position the current position in the file
- */
- void setLogPosition(String queueId, String filename, long position);
-
- /**
- * Get the current position for a specific WAL in a given queue.
- * @param queueId a String that identifies the queue
- * @param filename name of the WAL
- * @return the current position in the file
- */
- long getLogPosition(String queueId, String filename) throws ReplicationException;
-
- /**
- * Remove all replication queues for this region server.
- */
- void removeAllQueues();
-
- /**
- * Get a list of all WALs in the given queue.
- * @param queueId a String that identifies the queue
- * @return a list of WALs, null if no such queue exists for this server
- */
- List<String> getLogsInQueue(String queueId);
-
- /**
- * Get a list of all queues for this region server.
- * @return a list of queueIds, an empty list if this region server is dead and has no outstanding queues
- */
- List<String> getAllQueues();
-
- /**
- * Get queueIds from a dead region server, whose queues has not been claimed by other region
- * servers.
- * @return empty if the queue exists but no children, null if the queue does not exist.
- */
- List<String> getUnClaimedQueueIds(String regionserver);
-
- /**
- * Take ownership for the queue identified by queueId and belongs to a dead region server.
- * @param regionserver the id of the dead region server
- * @param queueId the id of the queue
- * @return the new PeerId and A SortedSet of WALs in its queue, and null if no unclaimed queue.
- */
- Pair<String, SortedSet<String>> claimQueue(String regionserver, String queueId);
-
- /**
- * Remove the znode of region server if the queue is empty.
- * @param regionserver
- */
- void removeReplicatorIfQueueIsEmpty(String regionserver);
-
- /**
- * Get a list of all region servers that have outstanding replication queues. These servers could
- * be alive, dead or from a previous run of the cluster.
- * @return a list of server names
- */
- List<String> getListOfReplicators();
-
- /**
- * Checks if the provided znode is the same as this region server's
- * @param regionserver the id of the region server
- * @return if this is this rs's znode
- */
- boolean isThisOurRegionServer(String regionserver);
-
- /**
- * Add a peer to hfile reference queue if peer does not exist.
- * @param peerId peer cluster id to be added
- * @throws ReplicationException if fails to add a peer id to hfile reference queue
- */
- void addPeerToHFileRefs(String peerId) throws ReplicationException;
-
- /**
- * Remove a peer from hfile reference queue.
- * @param peerId peer cluster id to be removed
- */
- void removePeerFromHFileRefs(String peerId);
-
- /**
- * Add new hfile references to the queue.
- * @param peerId peer cluster id to which the hfiles need to be replicated
- * @param pairs list of pairs of { HFile location in staging dir, HFile path in region dir which
- * will be added in the queue }
- * @throws ReplicationException if fails to add a hfile reference
- */
- void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs) throws ReplicationException;
-
- /**
- * Remove hfile references from the queue.
- * @param peerId peer cluster id from which this hfile references needs to be removed
- * @param files list of hfile references to be removed
- */
- void removeHFileRefs(String peerId, List<String> files);
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
deleted file mode 100644
index 12fc6a1..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesArguments.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * Wrapper around common arguments used to construct ReplicationQueues. Used to construct various
- * ReplicationQueues Implementations with different constructor arguments by reflection.
- */
-@InterfaceAudience.Private
-public class ReplicationQueuesArguments {
-
- private ZooKeeperWatcher zk;
- private Configuration conf;
- private Abortable abort;
-
- public ReplicationQueuesArguments(Configuration conf, Abortable abort) {
- this.conf = conf;
- this.abort = abort;
- }
-
- public ReplicationQueuesArguments(Configuration conf, Abortable abort, ZooKeeperWatcher zk) {
- this(conf, abort);
- setZk(zk);
- }
-
- public ZooKeeperWatcher getZk() {
- return zk;
- }
-
- public void setZk(ZooKeeperWatcher zk) {
- this.zk = zk;
- }
-
- public Configuration getConf() {
- return conf;
- }
-
- public void setConf(Configuration conf) {
- this.conf = conf;
- }
-
- public Abortable getAbortable() {
- return abort;
- }
-
- public void setAbortable(Abortable abort) {
- this.abort = abort;
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
deleted file mode 100644
index 6d8900e..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClient.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.util.List;
-import java.util.Set;
-
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This provides an interface for clients of replication to view replication queues. These queues
- * keep track of the sources(WALs/HFile references) that still need to be replicated to remote
- * clusters.
- */
-@InterfaceAudience.Private
-public interface ReplicationQueuesClient {
-
- /**
- * Initialize the replication queue client interface.
- */
- public void init() throws ReplicationException;
-
- /**
- * Get a list of all region servers that have outstanding replication queues. These servers could
- * be alive, dead or from a previous run of the cluster.
- * @return a list of server names
- * @throws KeeperException zookeeper exception
- */
- List<String> getListOfReplicators() throws KeeperException;
-
- /**
- * Get a list of all WALs in the given queue on the given region server.
- * @param serverName the server name of the region server that owns the queue
- * @param queueId a String that identifies the queue
- * @return a list of WALs, null if this region server is dead and has no outstanding queues
- * @throws KeeperException zookeeper exception
- */
- List<String> getLogsInQueue(String serverName, String queueId) throws KeeperException;
-
- /**
- * Get a list of all queues for the specified region server.
- * @param serverName the server name of the region server that owns the set of queues
- * @return a list of queueIds, null if this region server is not a replicator.
- */
- List<String> getAllQueues(String serverName) throws KeeperException;
-
- /**
- * Load all wals in all replication queues from ZK. This method guarantees to return a
- * snapshot which contains all WALs in the zookeeper at the start of this call even there
- * is concurrent queue failover. However, some newly created WALs during the call may
- * not be included.
- */
- Set<String> getAllWALs() throws KeeperException;
-
- /**
- * Get the change version number of replication hfile references node. This can be used as
- * optimistic locking to get a consistent snapshot of the replication queues of hfile references.
- * @return change version number of hfile references node
- */
- int getHFileRefsNodeChangeVersion() throws KeeperException;
-
- /**
- * Get list of all peers from hfile reference queue.
- * @return a list of peer ids
- * @throws KeeperException zookeeper exception
- */
- List<String> getAllPeersFromHFileRefsQueue() throws KeeperException;
-
- /**
- * Get a list of all hfile references in the given peer.
- * @param peerId a String that identifies the peer
- * @return a list of hfile references, null if not found any
- * @throws KeeperException zookeeper exception
- */
- List<String> getReplicableHFiles(String peerId) throws KeeperException;
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/26e6c2ce/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
----------------------------------------------------------------------
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
deleted file mode 100644
index 834f831..0000000
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesClientArguments.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-
-/**
- * Wrapper around common arguments used to construct ReplicationQueuesClient. Used to construct
- * various ReplicationQueuesClient Implementations with different constructor arguments by
- * reflection.
- */
-@InterfaceAudience.Private
-public class ReplicationQueuesClientArguments extends ReplicationQueuesArguments {
- public ReplicationQueuesClientArguments(Configuration conf, Abortable abort,
- ZooKeeperWatcher zk) {
- super(conf, abort, zk);
- }
- public ReplicationQueuesClientArguments(Configuration conf, Abortable abort) {
- super(conf, abort);
- }
-}