You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/04 01:23:56 UTC
[22/35] hbase git commit: HBASE-19661 Replace ReplicationStateZKBase
with ZKReplicationStorageBase
HBASE-19661 Replace ReplicationStateZKBase with ZKReplicationStorageBase
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/b8b84a9f
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/b8b84a9f
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/b8b84a9f
Branch: refs/heads/HBASE-19397
Commit: b8b84a9f5719928195d4534968fb4bfdd2521141
Parents: 5df509b
Author: huzheng <op...@gmail.com>
Authored: Fri Dec 29 15:55:28 2017 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Thu Jan 4 09:22:35 2018 +0800
----------------------------------------------------------------------
.../hbase/replication/ReplicationFactory.java | 5 +-
.../replication/ReplicationStateZKBase.java | 153 -------------------
.../replication/ReplicationTrackerZKImpl.java | 18 ++-
.../replication/ZKReplicationPeerStorage.java | 24 ++-
.../replication/ZKReplicationStorageBase.java | 13 +-
.../org/apache/hadoop/hbase/master/HMaster.java | 4 +-
.../master/ReplicationPeerConfigUpgrader.java | 128 ++++++++--------
.../regionserver/DumpReplicationQueues.java | 18 +--
.../replication/regionserver/Replication.java | 3 +-
.../org/apache/hadoop/hbase/util/HBaseFsck.java | 3 +-
.../TestReplicationTrackerZKImpl.java | 3 +-
.../replication/master/TestTableCFsUpdater.java | 41 ++---
.../TestReplicationSourceManager.java | 6 +-
13 files changed, 135 insertions(+), 284 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
index 6c66aff..2a970ba 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationFactory.java
@@ -33,9 +33,8 @@ public class ReplicationFactory {
return new ReplicationPeers(zk, conf);
}
- public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper,
- final ReplicationPeers replicationPeers, Configuration conf, Abortable abortable,
+ public static ReplicationTracker getReplicationTracker(ZKWatcher zookeeper, Abortable abortable,
Stoppable stopper) {
- return new ReplicationTrackerZKImpl(zookeeper, replicationPeers, conf, abortable, stopper);
+ return new ReplicationTrackerZKImpl(zookeeper, abortable, stopper);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
deleted file mode 100644
index f49537c..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStateZKBase.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-import org.apache.hbase.thirdparty.com.google.protobuf.CodedOutputStream;
-import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.zookeeper.KeeperException;
-
-/**
- * This is a base class for maintaining replication state in zookeeper.
- */
-@InterfaceAudience.Private
-public abstract class ReplicationStateZKBase {
-
- /**
- * The name of the znode that contains the replication status of a remote slave (i.e. peer)
- * cluster.
- */
- protected final String peerStateNodeName;
- /** The name of the base znode that contains all replication state. */
- protected final String replicationZNode;
- /** The name of the znode that contains a list of all remote slave (i.e. peer) clusters. */
- protected final String peersZNode;
- /** The name of the znode that contains all replication queues */
- protected final String queuesZNode;
- /** The name of the znode that contains queues of hfile references to be replicated */
- protected final String hfileRefsZNode;
- /** The cluster key of the local cluster */
- protected final String ourClusterKey;
- /** The name of the znode that contains tableCFs */
- protected final String tableCFsNodeName;
-
- protected final ZKWatcher zookeeper;
- protected final Configuration conf;
- protected final Abortable abortable;
-
- public static final byte[] ENABLED_ZNODE_BYTES =
- toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
- public static final byte[] DISABLED_ZNODE_BYTES =
- toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
- public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY =
- "zookeeper.znode.replication.hfile.refs";
- public static final String ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT = "hfile-refs";
-
- public ReplicationStateZKBase(ZKWatcher zookeeper, Configuration conf,
- Abortable abortable) {
- this.zookeeper = zookeeper;
- this.conf = conf;
- this.abortable = abortable;
-
- String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
- String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
- String queuesZNodeName = conf.get("zookeeper.znode.replication.rs", "rs");
- String hfileRefsZNodeName = conf.get(ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_KEY,
- ZOOKEEPER_ZNODE_REPLICATION_HFILE_REFS_DEFAULT);
- this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
- this.tableCFsNodeName = conf.get("zookeeper.znode.replication.peers.tableCFs", "tableCFs");
- this.ourClusterKey = ZKConfig.getZooKeeperClusterKey(this.conf);
- this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode,
- replicationZNodeName);
- this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
- this.queuesZNode = ZNodePaths.joinZNode(replicationZNode, queuesZNodeName);
- this.hfileRefsZNode = ZNodePaths.joinZNode(replicationZNode, hfileRefsZNodeName);
- }
-
- public List<String> getListOfReplicators() {
- List<String> result = null;
- try {
- result = ZKUtil.listChildrenNoWatch(this.zookeeper, this.queuesZNode);
- } catch (KeeperException e) {
- this.abortable.abort("Failed to get list of replicators", e);
- }
- return result;
- }
-
- /**
- * @param state
- * @return Serialized protobuf of <code>state</code> with pb magic prefix prepended suitable for
- * use as content of a peer-state znode under a peer cluster id as in
- * /hbase/replication/peers/PEER_ID/peer-state.
- */
- protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
- ReplicationProtos.ReplicationState msg =
- ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
- // There is no toByteArray on this pb Message?
- // 32 bytes is default which seems fair enough here.
- try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
- CodedOutputStream cos = CodedOutputStream.newInstance(baos, 16);
- msg.writeTo(cos);
- cos.flush();
- baos.flush();
- return ProtobufUtil.prependPBMagic(baos.toByteArray());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- protected boolean peerExists(String id) throws KeeperException {
- return ZKUtil.checkExists(this.zookeeper, ZNodePaths.joinZNode(this.peersZNode, id)) >= 0;
- }
-
- /**
- * Determine if a ZK path points to a peer node.
- * @param path path to be checked
- * @return true if the path points to a peer node, otherwise false
- */
- protected boolean isPeerPath(String path) {
- return path.split("/").length == peersZNode.split("/").length + 1;
- }
-
- @VisibleForTesting
- protected String getTableCFsNode(String id) {
- return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.tableCFsNodeName));
- }
-
- @VisibleForTesting
- protected String getPeerStateNode(String id) {
- return ZNodePaths.joinZNode(this.peersZNode, ZNodePaths.joinZNode(id, this.peerStateNodeName));
- }
- @VisibleForTesting
- protected String getPeerNode(String id) {
- return ZNodePaths.joinZNode(this.peersZNode, id);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
index 5659e4b..a84d330 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java
@@ -21,13 +21,12 @@ import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
+import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.zookeeper.ZKListener;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,9 +37,14 @@ import org.slf4j.LoggerFactory;
* interface.
*/
@InterfaceAudience.Private
-public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements ReplicationTracker {
+public class ReplicationTrackerZKImpl implements ReplicationTracker {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationTrackerZKImpl.class);
+
+ // Zookeeper
+ private final ZKWatcher zookeeper;
+ // Server to abort.
+ private final Abortable abortable;
// All about stopping
private final Stoppable stopper;
// listeners to be notified
@@ -48,9 +52,9 @@ public class ReplicationTrackerZKImpl extends ReplicationStateZKBase implements
// List of all the other region servers in this cluster
private final ArrayList<String> otherRegionServers = new ArrayList<>();
- public ReplicationTrackerZKImpl(ZKWatcher zookeeper, final ReplicationPeers replicationPeers,
- Configuration conf, Abortable abortable, Stoppable stopper) {
- super(zookeeper, conf, abortable);
+ public ReplicationTrackerZKImpl(ZKWatcher zookeeper, Abortable abortable, Stoppable stopper) {
+ this.zookeeper = zookeeper;
+ this.abortable = abortable;
this.stopper = stopper;
this.zookeeper.registerListener(new OtherRegionServerWatcher(this.zookeeper));
// watch the changes
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 42d4b3f..a53500a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -29,6 +29,7 @@ import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
@@ -36,7 +37,14 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
* ZK based replication peer storage.
*/
@InterfaceAudience.Private
-class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements ReplicationPeerStorage {
+public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
+ implements ReplicationPeerStorage {
+
+ public static final String PEERS_ZNODE = "zookeeper.znode.replication.peers";
+ public static final String PEERS_ZNODE_DEFAULT = "peers";
+
+ public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state";
+ public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
public static final byte[] ENABLED_ZNODE_BYTES =
toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
@@ -56,16 +64,18 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
public ZKReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) {
super(zookeeper, conf);
- this.peerStateNodeName = conf.get("zookeeper.znode.replication.peers.state", "peer-state");
- String peersZNodeName = conf.get("zookeeper.znode.replication.peers", "peers");
+ this.peerStateNodeName = conf.get(PEERS_STATE_ZNODE, PEERS_STATE_ZNODE_DEFAULT);
+ String peersZNodeName = conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT);
this.peersZNode = ZNodePaths.joinZNode(replicationZNode, peersZNodeName);
}
- private String getPeerStateNode(String peerId) {
+ @VisibleForTesting
+ public String getPeerStateNode(String peerId) {
return ZNodePaths.joinZNode(getPeerNode(peerId), peerStateNodeName);
}
- private String getPeerNode(String peerId) {
+ @VisibleForTesting
+ public String getPeerNode(String peerId) {
return ZNodePaths.joinZNode(peersZNode, peerId);
}
@@ -82,8 +92,8 @@ class ZKReplicationPeerStorage extends ZKReplicationStorageBase implements Repli
enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
false);
} catch (KeeperException e) {
- throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>" +
- peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
+ throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
+ + peerConfig + ", state=" + (enabled ? "ENABLED" : "DISABLED"), e);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
index 2321e4f..7190aeb 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationStorageBase.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.replication;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
@@ -34,7 +35,10 @@ import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
* zookeeper.
*/
@InterfaceAudience.Private
-class ZKReplicationStorageBase {
+public class ZKReplicationStorageBase {
+
+ public static final String REPLICATION_ZNODE = "zookeeper.znode.replication";
+ public static final String REPLICATION_ZNODE_DEFAULT = "replication";
/** The name of the base znode that contains all replication state. */
protected final String replicationZNode;
@@ -45,10 +49,9 @@ class ZKReplicationStorageBase {
protected ZKReplicationStorageBase(ZKWatcher zookeeper, Configuration conf) {
this.zookeeper = zookeeper;
this.conf = conf;
- String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
- this.replicationZNode =
- ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode, replicationZNodeName);
+ this.replicationZNode = ZNodePaths.joinZNode(this.zookeeper.znodePaths.baseZNode,
+ conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
}
/**
@@ -58,7 +61,7 @@ class ZKReplicationStorageBase {
*/
protected static byte[] toByteArray(final ReplicationProtos.ReplicationState.State state) {
ReplicationProtos.ReplicationState msg =
- ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
+ ReplicationProtos.ReplicationState.newBuilder().setState(state).build();
// There is no toByteArray on this pb Message?
// 32 bytes is default which seems fair enough here.
try (ByteArrayOutputStream baos = new ByteArrayOutputStream()) {
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
index 5c633cc..c586839 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/HMaster.java
@@ -801,8 +801,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// This is for backwards compatibility
// See HBASE-11393
status.setStatus("Update TableCFs node in ZNode");
- ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zooKeeper,
- conf, this.clusterConnection);
+ ReplicationPeerConfigUpgrader tableCFsUpdater =
+ new ReplicationPeerConfigUpgrader(zooKeeper, conf);
tableCFsUpdater.copyTableCFs();
// Add the Observer to delete space quotas on table deletion before starting all CPs by
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
index ea5509f..b6e8862 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/master/ReplicationPeerConfigUpgrader.java
@@ -18,96 +18,107 @@
*/
package org.apache.hadoop.hbase.replication.master;
+import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE;
+import static org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage.PEERS_ZNODE_DEFAULT;
+import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE;
+import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.REPLICATION_ZNODE_DEFAULT;
+
import java.io.IOException;
-import java.util.List;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
+import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.ReplicationPeerStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.yetus.audience.InterfaceStability;
import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
/**
- * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x.
- * It will be removed in HBase 3.x. See HBASE-11393
+ * This class is used to upgrade TableCFs from HBase 1.0, 1.1, 1.2, 1.3 to HBase 1.4 or 2.x. It will
+ * be removed in HBase 3.x. See HBASE-11393
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
-public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
+public class ReplicationPeerConfigUpgrader{
+
+ private static final String TABLE_CFS_ZNODE = "zookeeper.znode.replication.peers.tableCFs";
+ private static final String TABLE_CFS_ZNODE_DEFAULT = "tableCFs";
private static final Logger LOG = LoggerFactory.getLogger(ReplicationPeerConfigUpgrader.class);
+ private final Configuration conf;
+ private final ZKWatcher zookeeper;
+ private final ReplicationPeerStorage peerStorage;
- public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper,
- Configuration conf, Abortable abortable) {
- super(zookeeper, conf, abortable);
+ public ReplicationPeerConfigUpgrader(ZKWatcher zookeeper, Configuration conf) {
+ this.zookeeper = zookeeper;
+ this.conf = conf;
+ this.peerStorage = ReplicationStorageFactory.getReplicationPeerStorage(zookeeper, conf);
}
public void upgrade() throws Exception {
try (Connection conn = ConnectionFactory.createConnection(conf)) {
Admin admin = conn.getAdmin();
- admin.listReplicationPeers().forEach(
- (peerDesc) -> {
- String peerId = peerDesc.getPeerId();
- ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
- if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
- || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
- peerConfig.setReplicateAllUserTables(false);
- try {
- admin.updateReplicationPeerConfig(peerId, peerConfig);
- } catch (Exception e) {
- LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
- }
+ admin.listReplicationPeers().forEach((peerDesc) -> {
+ String peerId = peerDesc.getPeerId();
+ ReplicationPeerConfig peerConfig = peerDesc.getPeerConfig();
+ if ((peerConfig.getNamespaces() != null && !peerConfig.getNamespaces().isEmpty())
+ || (peerConfig.getTableCFsMap() != null && !peerConfig.getTableCFsMap().isEmpty())) {
+ peerConfig.setReplicateAllUserTables(false);
+ try {
+ admin.updateReplicationPeerConfig(peerId, peerConfig);
+ } catch (Exception e) {
+ LOG.error("Failed to upgrade replication peer config for peerId=" + peerId, e);
}
- });
+ }
+ });
}
}
- public void copyTableCFs() {
- List<String> znodes = null;
- try {
- znodes = ZKUtil.listChildrenNoWatch(this.zookeeper, this.peersZNode);
- } catch (KeeperException e) {
- LOG.error("Failed to get peers znode", e);
- }
- if (znodes != null) {
- for (String peerId : znodes) {
- if (!copyTableCFs(peerId)) {
- LOG.error("upgrade tableCFs failed for peerId=" + peerId);
- }
+ public void copyTableCFs() throws ReplicationException {
+ for (String peerId : peerStorage.listPeerIds()) {
+ if (!copyTableCFs(peerId)) {
+ LOG.error("upgrade tableCFs failed for peerId=" + peerId);
}
}
}
- public boolean copyTableCFs(String peerId) {
+ @VisibleForTesting
+ protected String getTableCFsNode(String peerId) {
+ String replicationZNode = ZNodePaths.joinZNode(zookeeper.znodePaths.baseZNode,
+ conf.get(REPLICATION_ZNODE, REPLICATION_ZNODE_DEFAULT));
+ String peersZNode =
+ ZNodePaths.joinZNode(replicationZNode, conf.get(PEERS_ZNODE, PEERS_ZNODE_DEFAULT));
+ return ZNodePaths.joinZNode(peersZNode,
+ ZNodePaths.joinZNode(peerId, conf.get(TABLE_CFS_ZNODE, TABLE_CFS_ZNODE_DEFAULT)));
+ }
+
+ public boolean copyTableCFs(String peerId) throws ReplicationException {
String tableCFsNode = getTableCFsNode(peerId);
try {
if (ZKUtil.checkExists(zookeeper, tableCFsNode) != -1) {
- String peerNode = getPeerNode(peerId);
- ReplicationPeerConfig rpc = getReplicationPeerConig(peerNode);
+ ReplicationPeerConfig rpc = peerStorage.getPeerConfig(peerId);
// We only need to copy data from tableCFs node to rpc Node the first time hmaster start.
if (rpc.getTableCFsMap() == null || rpc.getTableCFsMap().isEmpty()) {
// we copy TableCFs node into PeerNode
LOG.info("copy tableCFs into peerNode:" + peerId);
ReplicationProtos.TableCF[] tableCFs =
- ReplicationPeerConfigUtil.parseTableCFs(
- ZKUtil.getData(this.zookeeper, tableCFsNode));
+ ReplicationPeerConfigUtil.parseTableCFs(ZKUtil.getData(this.zookeeper, tableCFsNode));
if (tableCFs != null && tableCFs.length > 0) {
rpc.setTableCFsMap(ReplicationPeerConfigUtil.convert2Map(tableCFs));
- ZKUtil.setData(this.zookeeper, peerNode,
- ReplicationPeerConfigUtil.toByteArray(rpc));
+ peerStorage.updatePeerConfig(peerId, rpc);
}
} else {
LOG.info("No tableCFs in peerNode:" + peerId);
@@ -126,23 +137,6 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
return true;
}
- private ReplicationPeerConfig getReplicationPeerConig(String peerNode)
- throws KeeperException, InterruptedException {
- byte[] data = null;
- data = ZKUtil.getData(this.zookeeper, peerNode);
- if (data == null) {
- LOG.error("Could not get configuration for " +
- "peer because it doesn't exist. peer=" + peerNode);
- return null;
- }
- try {
- return ReplicationPeerConfigUtil.parsePeerFrom(data);
- } catch (DeserializationException e) {
- LOG.warn("Failed to parse cluster key from peer=" + peerNode);
- return null;
- }
- }
-
private static void printUsageAndExit() {
System.err.printf(
"Usage: hbase org.apache.hadoop.hbase.replication.master.ReplicationPeerConfigUpgrader"
@@ -163,19 +157,17 @@ public class ReplicationPeerConfigUpgrader extends ReplicationStateZKBase {
printUsageAndExit();
} else if (args[0].equals("copyTableCFs")) {
Configuration conf = HBaseConfiguration.create();
- ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
- try {
- ReplicationPeerConfigUpgrader tableCFsUpdater = new ReplicationPeerConfigUpgrader(zkw,
- conf, null);
+ try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
+ ReplicationPeerConfigUpgrader tableCFsUpdater =
+ new ReplicationPeerConfigUpgrader(zkw, conf);
tableCFsUpdater.copyTableCFs();
- } finally {
- zkw.close();
}
} else if (args[0].equals("upgrade")) {
Configuration conf = HBaseConfiguration.create();
- ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null);
- ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf, null);
- upgrader.upgrade();
+ try (ZKWatcher zkw = new ZKWatcher(conf, "ReplicationPeerConfigUpgrader", null)) {
+ ReplicationPeerConfigUpgrader upgrader = new ReplicationPeerConfigUpgrader(zkw, conf);
+ upgrader.upgrade();
+ }
} else {
printUsageAndExit();
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 27bda2d..22e8628 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -46,7 +46,6 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
@@ -237,7 +236,7 @@ public class DumpReplicationQueues extends Configured implements Tool {
LOG.info("Found [--distributed], will poll each RegionServer.");
Set<String> peerIds = peers.stream().map((peer) -> peer.getPeerId())
.collect(Collectors.toSet());
- System.out.println(dumpQueues(connection, zkw, peerIds, opts.isHdfs()));
+ System.out.println(dumpQueues(zkw, peerIds, opts.isHdfs()));
System.out.println(dumpReplicationSummary());
} else {
// use ZK instead
@@ -301,18 +300,15 @@ public class DumpReplicationQueues extends Configured implements Tool {
return sb.toString();
}
- public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds,
+ public String dumpQueues(ZKWatcher zkw, Set<String> peerIds,
boolean hdfs) throws Exception {
ReplicationQueueStorage queueStorage;
- ReplicationPeers replicationPeers;
ReplicationTracker replicationTracker;
StringBuilder sb = new StringBuilder();
queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
- replicationPeers =
- ReplicationFactory.getReplicationPeers(zkw, getConf());
- replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
- new WarnOnlyAbortable(), new WarnOnlyStoppable());
+ replicationTracker = ReplicationFactory.getReplicationTracker(zkw, new WarnOnlyAbortable(),
+ new WarnOnlyStoppable());
Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
// Loops each peer on each RS and dumps the queues
@@ -330,11 +326,9 @@ public class DumpReplicationQueues extends Configured implements Tool {
List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
if (!peerIds.contains(queueInfo.getPeerId())) {
deletedQueues.add(regionserver + "/" + queueId);
- sb.append(
- formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
+ sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, true, hdfs));
} else {
- sb.append(
- formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
+ sb.append(formatQueue(regionserver, queueStorage, queueInfo, queueId, wals, false, hdfs));
}
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
index f985f90..dca2439 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/Replication.java
@@ -132,8 +132,7 @@ public class Replication implements
ReplicationFactory.getReplicationPeers(server.getZooKeeper(), this.conf);
this.replicationPeers.init();
this.replicationTracker =
- ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.replicationPeers,
- this.conf, this.server, this.server);
+ ReplicationFactory.getReplicationTracker(server.getZooKeeper(), this.server, this.server);
} catch (Exception e) {
throw new IOException("Failed replication handler create", e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
index 307fa30..22a4d4b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java
@@ -330,8 +330,7 @@ public class HBaseFsck extends Configured implements Closeable {
* @throws MasterNotRunningException if the master is not running
* @throws ZooKeeperConnectionException if unable to connect to ZooKeeper
*/
- public HBaseFsck(Configuration conf) throws MasterNotRunningException,
- ZooKeeperConnectionException, IOException, ClassNotFoundException {
+ public HBaseFsck(Configuration conf) throws IOException, ClassNotFoundException {
this(conf, createThreadPool(conf));
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
index fdfa6b7..757d9a9 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationTrackerZKImpl.java
@@ -90,7 +90,8 @@ public class TestReplicationTrackerZKImpl {
ZKClusterId.setClusterId(zkw, new ClusterId());
rp = ReplicationFactory.getReplicationPeers(zkw, conf);
rp.init();
- rt = ReplicationFactory.getReplicationTracker(zkw, rp, conf, zkw, new DummyServer(fakeRs1));
+ rt = ReplicationFactory.getReplicationTracker(zkw, new DummyServer(fakeRs1),
+ new DummyServer(fakeRs1));
} catch (Exception e) {
fail("Exception during test setup: " + e);
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
index 2993043..19acc75 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/master/TestTableCFsUpdater.java
@@ -25,14 +25,13 @@ import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
+import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.zookeeper.KeeperException;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
@@ -57,12 +56,19 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
private static ZKWatcher zkw = null;
private static Abortable abortable = null;
+ private static ZKStorageUtil zkStorageUtil = null;
+
+ private static class ZKStorageUtil extends ZKReplicationPeerStorage {
+ public ZKStorageUtil(ZKWatcher zookeeper, Configuration conf) {
+ super(zookeeper, conf);
+ }
+ }
@Rule
public TestName name = new TestName();
public TestTableCFsUpdater() {
- super(zkw, TEST_UTIL.getConfiguration(), abortable);
+ super(zkw, TEST_UTIL.getConfiguration());
}
@BeforeClass
@@ -81,6 +87,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
}
};
zkw = new ZKWatcher(conf, "TableCFs", abortable, true);
+ zkStorageUtil = new ZKStorageUtil(zkw, conf);
}
@AfterClass
@@ -89,8 +96,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
}
@Test
- public void testUpgrade() throws KeeperException, InterruptedException,
- DeserializationException {
+ public void testUpgrade() throws Exception {
String peerId = "1";
final TableName tableName1 = TableName.valueOf(name.getMethodName() + "1");
final TableName tableName2 = TableName.valueOf(name.getMethodName() + "2");
@@ -98,13 +104,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
ReplicationPeerConfig rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
- String peerNode = getPeerNode(peerId);
+ String peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
String tableCFs = tableName1 + ":cf1,cf2;" + tableName2 + ":cf3;" + tableName3;
String tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
- ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+ ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
ReplicationPeerConfig actualRpc =
ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
@@ -117,13 +123,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
peerId = "2";
rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
- peerNode = getPeerNode(peerId);
+ peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFs = tableName1 + ":cf1,cf3;" + tableName2 + ":cf2";
tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
- ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+ ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
@@ -135,13 +141,13 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
peerId = "3";
rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
- peerNode = getPeerNode(peerId);
+ peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFs = "";
tableCFsNode = getTableCFsNode(peerId);
LOG.info("create tableCFs :" + tableCFsNode + " for peerId=" + peerId);
- ZKUtil.createWithParents(zkw, tableCFsNode , Bytes.toBytes(tableCFs));
+ ZKUtil.createWithParents(zkw, tableCFsNode, Bytes.toBytes(tableCFs));
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
actualTableCfs = Bytes.toString(ZKUtil.getData(zkw, tableCFsNode));
@@ -153,7 +159,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
peerId = "4";
rpc = new ReplicationPeerConfig();
rpc.setClusterKey(zkw.getQuorum());
- peerNode = getPeerNode(peerId);
+ peerNode = zkStorageUtil.getPeerNode(peerId);
ZKUtil.createWithParents(zkw, peerNode, ReplicationPeerConfigUtil.toByteArray(rpc));
tableCFsNode = getTableCFsNode(peerId);
@@ -167,7 +173,7 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
copyTableCFs();
peerId = "1";
- peerNode = getPeerNode(peerId);
+ peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
Map<TableName, List<String>> tableNameListMap = actualRpc.getTableCFsMap();
@@ -182,9 +188,8 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
assertEquals("cf3", tableNameListMap.get(tableName2).get(0));
assertNull(tableNameListMap.get(tableName3));
-
peerId = "2";
- peerNode = getPeerNode(peerId);
+ peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap();
@@ -198,19 +203,17 @@ public class TestTableCFsUpdater extends ReplicationPeerConfigUpgrader {
assertEquals("cf2", tableNameListMap.get(tableName2).get(0));
peerId = "3";
- peerNode = getPeerNode(peerId);
+ peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap();
assertNull(tableNameListMap);
peerId = "4";
- peerNode = getPeerNode(peerId);
+ peerNode = zkStorageUtil.getPeerNode(peerId);
actualRpc = ReplicationPeerConfigUtil.parsePeerFrom(ZKUtil.getData(zkw, peerNode));
assertEquals(rpc.getClusterKey(), actualRpc.getClusterKey());
tableNameListMap = actualRpc.getTableCFsMap();
assertNull(tableNameListMap);
}
-
-
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/b8b84a9f/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 46b53b5..163a96e 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -68,8 +68,8 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
-import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
+import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -163,9 +163,9 @@ public abstract class TestReplicationSourceManager {
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
- ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+ ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
- ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+ ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());