You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/03/17 12:31:37 UTC
[1/2] hbase git commit: Revert "HBASE-19665 Add table based
replication peers/queues storage back"
Repository: hbase
Updated Branches:
refs/heads/master 104f58701 -> 00095a2ef
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 4a36e13..6d75fec 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -70,7 +70,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
+import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -170,9 +170,9 @@ public abstract class TestReplicationSourceManager {
+ conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
- ReplicationUtils.PEER_STATE_ENABLED_BYTES);
+ ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKUtil.createWithParents(zkw, "/hbase/replication/state");
- ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationUtils.PEER_STATE_ENABLED_BYTES);
+ ZKUtil.setData(zkw, "/hbase/replication/state", ZKReplicationPeerStorage.ENABLED_ZNODE_BYTES);
ZKClusterId.setClusterId(zkw, new ClusterId());
FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java
deleted file mode 100644
index 461420e..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateBasic.java
+++ /dev/null
@@ -1,370 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ReplicationPeerImpl;
-import org.apache.hadoop.hbase.replication.ReplicationPeers;
-import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.ReplicationUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
-
-/**
- * White box testing for replication state interfaces. Implementations should extend this class, and
- * initialize the interfaces properly.
- */
-public abstract class TestReplicationStateBasic {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
-
- protected ReplicationQueueStorage rqs;
- protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
- protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
- protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
- protected ReplicationPeers rp;
- protected static final String ID_ONE = "1";
- protected static final String ID_TWO = "2";
- protected static String KEY_ONE;
- protected static String KEY_TWO;
-
- // For testing when we try to replicate to ourself
- protected String OUR_KEY;
-
- protected static int zkTimeoutCount;
- protected static final int ZK_MAX_COUNT = 300;
- protected static final int ZK_SLEEP_INTERVAL = 100; // millis
-
- @Test
- public void testReplicationQueueStorage() throws ReplicationException {
- // Test methods with empty state
- assertEquals(0, rqs.getListOfReplicators().size());
- assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
- assertTrue(rqs.getAllQueues(server1).isEmpty());
-
- /*
- * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
- * server2: zero queues
- */
- rqs.addWAL(server1, "qId1", "trash");
- rqs.removeWAL(server1, "qId1", "trash");
- rqs.addWAL(server1,"qId2", "filename1");
- rqs.addWAL(server1,"qId3", "filename2");
- rqs.addWAL(server1,"qId3", "filename3");
- rqs.addWAL(server2,"trash", "trash");
- rqs.removeQueue(server2,"trash");
-
- List<ServerName> reps = rqs.getListOfReplicators();
- assertEquals(2, reps.size());
- assertTrue(server1.getServerName(), reps.contains(server1));
- assertTrue(server2.getServerName(), reps.contains(server2));
-
- assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty());
- assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
- assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
- assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
- assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
-
- assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty());
- assertEquals(0, rqs.getAllQueues(server2).size());
- List<String> list = rqs.getAllQueues(server1);
- assertEquals(3, list.size());
- assertTrue(list.contains("qId2"));
- assertTrue(list.contains("qId3"));
- }
-
- private void removeAllQueues(ServerName serverName) throws ReplicationException {
- for (String queue: rqs.getAllQueues(serverName)) {
- rqs.removeQueue(serverName, queue);
- }
- }
- @Test
- public void testReplicationQueues() throws ReplicationException {
- // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
- rp.init();
-
- rqs.removeQueue(server1, "bogus");
- rqs.removeWAL(server1, "bogus", "bogus");
- removeAllQueues(server1);
- assertEquals(0, rqs.getAllQueues(server1).size());
- assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
- assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
- assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
-
- populateQueues();
-
- assertEquals(3, rqs.getListOfReplicators().size());
- assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
- assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
- assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
- rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
- assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
-
- assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
- assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
- assertEquals(0, rqs.getAllQueues(server1).size());
- assertEquals(1, rqs.getAllQueues(server2).size());
- assertEquals(5, rqs.getAllQueues(server3).size());
-
- assertEquals(0, rqs.getAllQueues(server1).size());
- rqs.removeReplicatorIfQueueIsEmpty(server1);
- assertEquals(2, rqs.getListOfReplicators().size());
-
- List<String> queues = rqs.getAllQueues(server3);
- assertEquals(5, queues.size());
- for (String queue : queues) {
- rqs.claimQueue(server3, queue, server2);
- }
- rqs.removeReplicatorIfQueueIsEmpty(server3);
- assertEquals(1, rqs.getListOfReplicators().size());
-
- assertEquals(6, rqs.getAllQueues(server2).size());
- removeAllQueues(server2);
- rqs.removeReplicatorIfQueueIsEmpty(server2);
- assertEquals(0, rqs.getListOfReplicators().size());
- }
-
- @Test
- public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
- rp.init();
-
- List<Pair<Path, Path>> files1 = new ArrayList<>(3);
- files1.add(new Pair<>(null, new Path("file_1")));
- files1.add(new Pair<>(null, new Path("file_2")));
- files1.add(new Pair<>(null, new Path("file_3")));
- assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
- assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
- rp.getPeerStorage().addPeer(ID_ONE,
- ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
- rqs.addPeerToHFileRefs(ID_ONE);
- rqs.addHFileRefs(ID_ONE, files1);
- assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
- assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
- List<String> hfiles2 = new ArrayList<>(files1.size());
- for (Pair<Path, Path> p : files1) {
- hfiles2.add(p.getSecond().getName());
- }
- String removedString = hfiles2.remove(0);
- rqs.removeHFileRefs(ID_ONE, hfiles2);
- assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
- hfiles2 = new ArrayList<>(1);
- hfiles2.add(removedString);
- rqs.removeHFileRefs(ID_ONE, hfiles2);
- assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
- rp.getPeerStorage().removePeer(ID_ONE);
- }
-
- @Test
- public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
- rp.init();
- rp.getPeerStorage().addPeer(ID_ONE,
- ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
- rqs.addPeerToHFileRefs(ID_ONE);
- rp.getPeerStorage().addPeer(ID_TWO,
- ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
- rqs.addPeerToHFileRefs(ID_TWO);
-
- List<Pair<Path, Path>> files1 = new ArrayList<>(3);
- files1.add(new Pair<>(null, new Path("file_1")));
- files1.add(new Pair<>(null, new Path("file_2")));
- files1.add(new Pair<>(null, new Path("file_3")));
- rqs.addHFileRefs(ID_ONE, files1);
- rqs.addHFileRefs(ID_TWO, files1);
- assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
- assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
- assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
-
- rp.getPeerStorage().removePeer(ID_ONE);
- rqs.removePeerFromHFileRefs(ID_ONE);
- assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
- assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
- assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
-
- rp.getPeerStorage().removePeer(ID_TWO);
- rqs.removePeerFromHFileRefs(ID_TWO);
- assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
- assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
- }
-
- @Test
- public void testReplicationPeers() throws Exception {
- rp.init();
-
- try {
- rp.getPeerStorage().setPeerState("bogus", true);
- fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId");
- } catch (ReplicationException e) {
- }
- try {
- rp.getPeerStorage().setPeerState("bogus", false);
- fail("Should have thrown an ReplicationException when passed a non-exist bogus peerId");
- } catch (ReplicationException e) {
- }
-
- try {
- assertFalse(rp.addPeer("bogus"));
- fail("Should have thrown an ReplicationException when creating a bogus peerId "
- + "with null peer config");
- } catch (ReplicationException e) {
- }
-
- assertNumberOfPeers(0);
-
- // Add some peers
- rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
- assertNumberOfPeers(1);
- rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
- assertNumberOfPeers(2);
-
- assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
- .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
- rp.getPeerStorage().removePeer(ID_ONE);
- rp.removePeer(ID_ONE);
- assertNumberOfPeers(1);
-
- // Add one peer
- rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
- rp.addPeer(ID_ONE);
- assertNumberOfPeers(2);
- assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
- rp.getPeerStorage().setPeerState(ID_ONE, false);
- // now we do not rely on zk watcher to trigger the state change so we need to trigger it
- // manually...
- ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
- rp.refreshPeerState(peer.getId());
- assertEquals(PeerState.DISABLED, peer.getPeerState());
- assertConnectedPeerStatus(false, ID_ONE);
- rp.getPeerStorage().setPeerState(ID_ONE, true);
- // now we do not rely on zk watcher to trigger the state change so we need to trigger it
- // manually...
- rp.refreshPeerState(peer.getId());
- assertEquals(PeerState.ENABLED, peer.getPeerState());
- assertConnectedPeerStatus(true, ID_ONE);
-
- // Disconnect peer
- rp.removePeer(ID_ONE);
- assertNumberOfPeers(2);
- }
-
- private String getFileName(String base, int i) {
- return String.format(base + "-%04d", i);
- }
-
- @Test
- public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
- ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
- assertTrue(rqs.getAllQueues(serverName1).isEmpty());
- String queue1 = "1";
- String region0 = "region0", region1 = "region1";
- for (int i = 0; i < 10; i++) {
- rqs.addWAL(serverName1, queue1, getFileName("file1", i));
- }
- List<String> queueIds = rqs.getAllQueues(serverName1);
- assertEquals(1, queueIds.size());
- assertThat(queueIds, hasItems("1"));
-
- List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
- assertEquals(10, wals1.size());
- for (int i = 0; i < 10; i++) {
- assertThat(wals1, hasItems(getFileName("file1", i)));
- }
-
- for (int i = 0; i < 10; i++) {
- assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
- }
- assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
- assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
-
- for (int i = 0; i < 10; i++) {
- rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
- ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
- }
-
- for (int i = 0; i < 10; i++) {
- assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
- }
- assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
- assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
- }
-
- protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
- // we can first check if the value was changed in the store, if it wasn't then fail right away
- if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
- fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
- }
- while (true) {
- if (status == rp.getPeer(peerId).isPeerEnabled()) {
- return;
- }
- if (zkTimeoutCount < ZK_MAX_COUNT) {
- LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
- + ", sleeping and trying again.");
- Thread.sleep(ZK_SLEEP_INTERVAL);
- } else {
- fail("Timed out waiting for ConnectedPeerStatus to be " + status);
- }
- }
- }
-
- protected void assertNumberOfPeers(int total) throws ReplicationException {
- assertEquals(total, rp.getPeerStorage().listPeerIds().size());
- }
-
- /*
- * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
- * 3, 4, 5 log files respectively
- */
- protected void populateQueues() throws ReplicationException {
- rqs.addWAL(server1, "trash", "trash");
- rqs.removeQueue(server1, "trash");
-
- rqs.addWAL(server2, "qId1", "trash");
- rqs.removeWAL(server2, "qId1", "trash");
-
- for (int i = 1; i < 6; i++) {
- for (int j = 0; j < i; j++) {
- rqs.addWAL(server3, "qId" + i, "filename" + j);
- }
- // Add peers for the corresponding queues so they are not orphans
- rp.getPeerStorage().addPeer("qId" + i,
- ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
- true);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java
deleted file mode 100644
index d073669..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateTableImpl.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication.storage;
-
-import java.io.IOException;
-
-import org.apache.commons.io.IOUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.client.Admin;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.TableDescriptor;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.replication.TableReplicationPeerStorage;
-import org.apache.hadoop.hbase.replication.TableReplicationQueueStorage;
-import org.apache.hadoop.hbase.replication.TableReplicationStorageBase;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestReplicationStateTableImpl extends TestReplicationStateBasic {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationStateTableImpl.class);
-
- private static Configuration conf;
- private static HBaseTestingUtility utility = new HBaseTestingUtility();
- private static ZKWatcher zkw;
- private static Connection connection;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- conf = utility.getConfiguration();
- conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
- conf.set(HConstants.REPLICATION_CLUSTER_ID, "12345");
- utility.startMiniCluster();
-
- // After the HBase Mini cluster startup, we set the storage implementation to table based
- // implementation. Otherwise, we cannot setup the HBase Mini Cluster because the master will
- // list peers before finish its initialization, and if master cannot finish initialization, the
- // meta cannot be online, in other hand, if meta cannot be online, the list peers never success
- // when using table based replication. a dead loop happen.
- // Our UTs are written for testing storage layer, so no problem here.
- conf.set(ReplicationStorageFactory.REPLICATION_PEER_STORAGE_IMPL,
- TableReplicationPeerStorage.class.getName());
- conf.set(ReplicationStorageFactory.REPLICATION_QUEUE_STORAGE_IMPL,
- TableReplicationQueueStorage.class.getName());
-
- zkw = utility.getZooKeeperWatcher();
- connection = ConnectionFactory.createConnection(conf);
-
- KEY_ONE = initPeerClusterState("/hbase1");
- KEY_TWO = initPeerClusterState("/hbase2");
- }
-
- private static String initPeerClusterState(String baseZKNode)
- throws IOException, KeeperException {
- // Add a dummy region server and set up the cluster id
- Configuration testConf = new Configuration(conf);
- testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
- ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
- String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234");
- ZKUtil.createWithParents(zkw1, fakeRs);
- ZKClusterId.setClusterId(zkw1, new ClusterId());
- return ZKConfig.getZooKeeperClusterKey(testConf);
- }
-
- @Before
- public void setUp() throws IOException {
- rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
- rp = ReplicationFactory.getReplicationPeers(zkw, conf);
- OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
-
- // Create hbase:replication meta table.
- try (Admin admin = connection.getAdmin()) {
- TableDescriptor table =
- TableReplicationStorageBase.createReplicationTableDescBuilder(conf).build();
- admin.createTable(table);
- }
- }
-
- @After
- public void tearDown() throws KeeperException, IOException {
- // Drop the hbase:replication meta table.
- utility.deleteTable(TableReplicationStorageBase.REPLICATION_TABLE);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- if (connection != null) {
- IOUtils.closeQuietly(connection);
- }
- utility.shutdownMiniZKCluster();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java
deleted file mode 100644
index 993f2fb..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestReplicationStateZKImpl.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.replication.ReplicationFactory;
-import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
-
- private static Configuration conf;
- private static HBaseZKTestingUtility utility;
- private static ZKWatcher zkw;
- private static String replicationZNode;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- utility = new HBaseZKTestingUtility();
- utility.startMiniZKCluster();
- conf = utility.getConfiguration();
- conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
- zkw = utility.getZooKeeperWatcher();
- String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
- replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
- KEY_ONE = initPeerClusterState("/hbase1");
- KEY_TWO = initPeerClusterState("/hbase2");
- }
-
- private static String initPeerClusterState(String baseZKNode)
- throws IOException, KeeperException {
- // Add a dummy region server and set up the cluster id
- Configuration testConf = new Configuration(conf);
- testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
- ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
- String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234");
- ZKUtil.createWithParents(zkw1, fakeRs);
- ZKClusterId.setClusterId(zkw1, new ClusterId());
- return ZKConfig.getZooKeeperClusterKey(testConf);
- }
-
- @Before
- public void setUp() {
- zkTimeoutCount = 0;
- rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
- rp = ReplicationFactory.getReplicationPeers(zkw, conf);
- OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
- }
-
- @After
- public void tearDown() throws KeeperException, IOException {
- ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- utility.shutdownMiniZKCluster();
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java
deleted file mode 100644
index 190eef4..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationPeerStorage.java
+++ /dev/null
@@ -1,182 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import static java.util.stream.Collectors.toList;
-import static java.util.stream.Collectors.toSet;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-import java.util.stream.Stream;
-
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
-import org.apache.hadoop.hbase.replication.ZKReplicationPeerStorage;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationPeerStorage {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
-
- private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
-
- private static ZKReplicationPeerStorage STORAGE;
-
- @BeforeClass
- public static void setUp() throws Exception {
- UTIL.startMiniZKCluster();
- STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
- }
-
- @AfterClass
- public static void tearDown() throws IOException {
- UTIL.shutdownMiniZKCluster();
- }
-
- private Set<String> randNamespaces(Random rand) {
- return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
- .collect(toSet());
- }
-
- private Map<TableName, List<String>> randTableCFs(Random rand) {
- int size = rand.nextInt(5);
- Map<TableName, List<String>> map = new HashMap<>();
- for (int i = 0; i < size; i++) {
- TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
- List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
- .limit(rand.nextInt(5)).collect(toList());
- map.put(tn, cfs);
- }
- return map;
- }
-
- private ReplicationPeerConfig getConfig(int seed) {
- Random rand = new Random(seed);
- return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
- .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
- .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
- .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
- .setBandwidth(rand.nextInt(1000)).build();
- }
-
- private void assertSetEquals(Set<String> expected, Set<String> actual) {
- if (expected == null || expected.size() == 0) {
- assertTrue(actual == null || actual.size() == 0);
- return;
- }
- assertEquals(expected.size(), actual.size());
- expected.forEach(s -> assertTrue(actual.contains(s)));
- }
-
- private void assertMapEquals(Map<TableName, List<String>> expected,
- Map<TableName, List<String>> actual) {
- if (expected == null || expected.size() == 0) {
- assertTrue(actual == null || actual.size() == 0);
- return;
- }
- assertEquals(expected.size(), actual.size());
- expected.forEach((expectedTn, expectedCFs) -> {
- List<String> actualCFs = actual.get(expectedTn);
- if (expectedCFs == null || expectedCFs.size() == 0) {
- assertTrue(actual.containsKey(expectedTn));
- assertTrue(actualCFs == null || actualCFs.size() == 0);
- } else {
- assertNotNull(actualCFs);
- assertEquals(expectedCFs.size(), actualCFs.size());
- for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator();
- expectedIt.hasNext();) {
- assertEquals(expectedIt.next(), actualIt.next());
- }
- }
- });
- }
-
- private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
- assertEquals(expected.getClusterKey(), actual.getClusterKey());
- assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
- assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
- assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
- assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
- assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
- assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
- assertEquals(expected.getBandwidth(), actual.getBandwidth());
- }
-
- @Test
- public void test() throws ReplicationException {
- int peerCount = 10;
- for (int i = 0; i < peerCount; i++) {
- STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
- }
- List<String> peerIds = STORAGE.listPeerIds();
- assertEquals(peerCount, peerIds.size());
- for (String peerId : peerIds) {
- int seed = Integer.parseInt(peerId);
- assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
- }
- for (int i = 0; i < peerCount; i++) {
- STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
- }
- for (String peerId : peerIds) {
- int seed = Integer.parseInt(peerId);
- assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
- }
- for (int i = 0; i < peerCount; i++) {
- assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
- }
- for (int i = 0; i < peerCount; i++) {
- STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
- }
- for (int i = 0; i < peerCount; i++) {
- assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
- }
- String toRemove = Integer.toString(peerCount / 2);
- STORAGE.removePeer(toRemove);
- peerIds = STORAGE.listPeerIds();
- assertEquals(peerCount - 1, peerIds.size());
- assertFalse(peerIds.contains(toRemove));
-
- try {
- STORAGE.getPeerConfig(toRemove);
- fail("Should throw a ReplicationException when get peer config of a peerId");
- } catch (ReplicationException e) {
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java
deleted file mode 100644
index 780ff2a..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/storage/TestZKReplicationQueueStorage.java
+++ /dev/null
@@ -1,255 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication.storage;
-
-import static org.hamcrest.CoreMatchers.hasItems;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertThat;
-import static org.junit.Assert.assertTrue;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Set;
-import java.util.SortedSet;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.HBaseClassTestRule;
-import org.apache.hadoop.hbase.HBaseZKTestingUtility;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.replication.ReplicationException;
-import org.apache.hadoop.hbase.replication.ZKReplicationQueueStorage;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-
-@Category({ ReplicationTests.class, MediumTests.class })
-public class TestZKReplicationQueueStorage {
-
- @ClassRule
- public static final HBaseClassTestRule CLASS_RULE =
- HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
-
- private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
-
- private static ZKReplicationQueueStorage STORAGE;
-
- @BeforeClass
- public static void setUp() throws Exception {
- UTIL.startMiniZKCluster();
- STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
- }
-
- @AfterClass
- public static void tearDown() throws IOException {
- UTIL.shutdownMiniZKCluster();
- }
-
- @After
- public void tearDownAfterTest() throws ReplicationException {
- for (ServerName serverName : STORAGE.getListOfReplicators()) {
- for (String queue : STORAGE.getAllQueues(serverName)) {
- STORAGE.removeQueue(serverName, queue);
- }
- STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
- }
- for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
- STORAGE.removePeerFromHFileRefs(peerId);
- }
- }
-
- private ServerName getServerName(int i) {
- return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
- }
-
- @Test
- public void testReplicator() throws ReplicationException {
- assertTrue(STORAGE.getListOfReplicators().isEmpty());
- String queueId = "1";
- for (int i = 0; i < 10; i++) {
- STORAGE.addWAL(getServerName(i), queueId, "file" + i);
- }
- List<ServerName> replicators = STORAGE.getListOfReplicators();
- assertEquals(10, replicators.size());
- for (int i = 0; i < 10; i++) {
- assertThat(replicators, hasItems(getServerName(i)));
- }
- for (int i = 0; i < 5; i++) {
- STORAGE.removeQueue(getServerName(i), queueId);
- }
- for (int i = 0; i < 10; i++) {
- STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
- }
- replicators = STORAGE.getListOfReplicators();
- assertEquals(5, replicators.size());
- for (int i = 5; i < 10; i++) {
- assertThat(replicators, hasItems(getServerName(i)));
- }
- }
-
- private String getFileName(String base, int i) {
- return String.format(base + "-%04d", i);
- }
-
- @Test
- public void testAddRemoveLog() throws ReplicationException {
- ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
- assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
- String queue1 = "1";
- String queue2 = "2";
- for (int i = 0; i < 10; i++) {
- STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
- STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
- }
- List<String> queueIds = STORAGE.getAllQueues(serverName1);
- assertEquals(2, queueIds.size());
- assertThat(queueIds, hasItems("1", "2"));
-
- List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
- List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
- assertEquals(10, wals1.size());
- assertEquals(10, wals2.size());
- for (int i = 0; i < 10; i++) {
- assertThat(wals1, hasItems(getFileName("file1", i)));
- assertThat(wals2, hasItems(getFileName("file2", i)));
- }
-
- for (int i = 0; i < 10; i++) {
- assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
- assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
- STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
- STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
- null);
- }
-
- for (int i = 0; i < 10; i++) {
- assertEquals((i + 1) * 100,
- STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
- assertEquals((i + 1) * 100 + 10,
- STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
- }
-
- for (int i = 0; i < 10; i++) {
- if (i % 2 == 0) {
- STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
- } else {
- STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
- }
- }
-
- queueIds = STORAGE.getAllQueues(serverName1);
- assertEquals(2, queueIds.size());
- assertThat(queueIds, hasItems("1", "2"));
-
- ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
- Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2);
-
- assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
- assertEquals(5, peer1.getSecond().size());
- int i = 1;
- for (String wal : peer1.getSecond()) {
- assertEquals(getFileName("file1", i), wal);
- assertEquals((i + 1) * 100,
- STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i)));
- i += 2;
- }
-
- queueIds = STORAGE.getAllQueues(serverName1);
- assertEquals(1, queueIds.size());
- assertThat(queueIds, hasItems("2"));
- wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
- assertEquals(5, wals2.size());
- for (i = 0; i < 10; i += 2) {
- assertThat(wals2, hasItems(getFileName("file2", i)));
- }
-
- queueIds = STORAGE.getAllQueues(serverName2);
- assertEquals(1, queueIds.size());
- assertThat(queueIds, hasItems(peer1.getFirst()));
- wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
- assertEquals(5, wals1.size());
- for (i = 1; i < 10; i += 2) {
- assertThat(wals1, hasItems(getFileName("file1", i)));
- }
-
- Set<String> allWals = STORAGE.getAllWALs();
- assertEquals(10, allWals.size());
- for (i = 0; i < 10; i++) {
- assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
- }
- }
-
- // For HBASE-12865
- @Test
- public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
- ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
- STORAGE.addWAL(serverName1, "1", "file");
-
- int v0 = STORAGE.getQueuesZNodeCversion();
- ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
- STORAGE.claimQueue(serverName1, "1", serverName2);
- int v1 = STORAGE.getQueuesZNodeCversion();
- // cversion should increase by 1 since a child node is deleted
- assertEquals(1, v1 - v0);
- }
-
- private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException {
- return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
-
- private int called = 0;
-
- @Override
- public int getQueuesZNodeCversion() throws KeeperException {
- if (called < 4) {
- called++;
- }
- return called;
- }
- };
- }
-
- @Test
- public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
- ZKReplicationQueueStorage storage = createWithUnstableCversion();
- storage.addWAL(getServerName(0), "1", "file");
- // This should return eventually when cversion stabilizes
- Set<String> allWals = storage.getAllWALs();
- assertEquals(1, allWals.size());
- assertThat(allWals, hasItems("file"));
- }
-
- // For HBASE-14621
- @Test
- public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
- ZKReplicationQueueStorage storage = createWithUnstableCversion();
- storage.addPeerToHFileRefs("1");
- Path p = new Path("/test");
- storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
- // This should return eventually when cversion stabilizes
- Set<String> allHFileRefs = storage.getAllHFileRefs();
- assertEquals(1, allHFileRefs.size());
- assertThat(allHFileRefs, hasItems("test"));
- }
-}
[2/2] hbase git commit: Revert "HBASE-19665 Add table based
replication peers/queues storage back"
Posted by zh...@apache.org.
Revert "HBASE-19665 Add table based replication peers/queues storage back"
This reverts commit 31978c31bbf363d98c50cc6b293105a085888471.
Conflicts:
hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/00095a2e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/00095a2e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/00095a2e
Branch: refs/heads/master
Commit: 00095a2ef9442e3fd86c04876c9d91f2f8b23ad8
Parents: 104f587
Author: zhangduo <zh...@apache.org>
Authored: Sat Mar 17 20:25:27 2018 +0800
Committer: zhangduo <zh...@apache.org>
Committed: Sat Mar 17 20:25:27 2018 +0800
----------------------------------------------------------------------
.../replication/ReplicationPeerStorage.java | 3 +-
.../replication/ReplicationStorageFactory.java | 20 +-
.../hbase/replication/ReplicationUtils.java | 13 -
.../TableReplicationPeerStorage.java | 171 ------
.../TableReplicationQueueStorage.java | 522 -------------------
.../TableReplicationStorageBase.java | 127 -----
.../replication/ZKReplicationPeerStorage.java | 16 +-
.../replication/ZKReplicationQueueStorage.java | 6 +-
.../replication/TestReplicationStateBasic.java | 363 +++++++++++++
.../replication/TestReplicationStateZKImpl.java | 95 ++++
.../TestZKReplicationPeerStorage.java | 178 +++++++
.../TestZKReplicationQueueStorage.java | 252 +++++++++
.../TestReplicationSourceManager.java | 6 +-
.../storage/TestReplicationStateBasic.java | 370 -------------
.../storage/TestReplicationStateTableImpl.java | 129 -----
.../storage/TestReplicationStateZKImpl.java | 98 ----
.../storage/TestZKReplicationPeerStorage.java | 182 -------
.../storage/TestZKReplicationQueueStorage.java | 255 ---------
18 files changed, 907 insertions(+), 1899 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
index 4684f08..1adda02 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationPeerStorage.java
@@ -42,8 +42,7 @@ public interface ReplicationPeerStorage {
/**
* Set the state of peer, {@code true} to {@code ENABLED}, otherwise to {@code DISABLED}.
- * @throws ReplicationException if there are errors accessing the storage service or peer does not
- * exist.
+ * @throws ReplicationException if there are errors accessing the storage service.
*/
void setPeerState(String peerId, boolean enabled) throws ReplicationException;
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
index cbfec3b..462cfed 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationStorageFactory.java
@@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.replication;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
@@ -30,15 +29,6 @@ import org.apache.yetus.audience.InterfaceAudience;
@InterfaceAudience.Private
public final class ReplicationStorageFactory {
- public static final String REPLICATION_PEER_STORAGE_IMPL = "hbase.replication.peer.storage.impl";
- public static final String DEFAULT_REPLICATION_PEER_STORAGE_IMPL =
- ZKReplicationPeerStorage.class.getName();
-
- public static final String REPLICATION_QUEUE_STORAGE_IMPL =
- "hbase.replication.queue.storage.impl";
- public static final String DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL =
- ZKReplicationQueueStorage.class.getName();
-
private ReplicationStorageFactory() {
}
@@ -46,10 +36,7 @@ public final class ReplicationStorageFactory {
* Create a new {@link ReplicationPeerStorage}.
*/
public static ReplicationPeerStorage getReplicationPeerStorage(ZKWatcher zk, Configuration conf) {
- String peerStorageClass =
- conf.get(REPLICATION_PEER_STORAGE_IMPL, DEFAULT_REPLICATION_PEER_STORAGE_IMPL);
- return ReflectionUtils.instantiateWithCustomCtor(peerStorageClass,
- new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf });
+ return new ZKReplicationPeerStorage(zk, conf);
}
/**
@@ -57,9 +44,6 @@ public final class ReplicationStorageFactory {
*/
public static ReplicationQueueStorage getReplicationQueueStorage(ZKWatcher zk,
Configuration conf) {
- String queueStorageClass =
- conf.get(REPLICATION_QUEUE_STORAGE_IMPL, DEFAULT_REPLICATION_QUEUE_STORAGE_IMPL);
- return ReflectionUtils.instantiateWithCustomCtor(queueStorageClass,
- new Class[] { ZKWatcher.class, Configuration.class }, new Object[] { zk, conf });
+ return new ZKReplicationQueueStorage(zk, conf);
}
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
index 2e86c17..e2479e0 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ReplicationUtils.java
@@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.replication;
-import static org.apache.hadoop.hbase.replication.ZKReplicationStorageBase.toByteArray;
-
import java.io.IOException;
import java.util.Collection;
import java.util.List;
@@ -32,19 +30,12 @@ import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.yetus.audience.InterfaceAudience;
-import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
-
/**
* Helper class for replication.
*/
@InterfaceAudience.Private
public final class ReplicationUtils {
- public static final byte[] PEER_STATE_ENABLED_BYTES =
- toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
- public static final byte[] PEER_STATE_DISABLED_BYTES =
- toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
-
private ReplicationUtils() {
}
@@ -182,8 +173,4 @@ public final class ReplicationUtils {
return tableCFs != null && tableCFs.containsKey(tableName);
}
}
-
- public static String parsePeerIdFromQueueId(String queueId) {
- return new ReplicationQueueInfo(queueId).getPeerId();
- }
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java
deleted file mode 100644
index ee7969b..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationPeerStorage.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
-import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.replication.ReplicationPeerConfigUtil;
-import org.apache.hadoop.hbase.exceptions.DeserializationException;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-/**
- * Table based replication peer storage.
- */
-@InterfaceAudience.Private
-public class TableReplicationPeerStorage extends TableReplicationStorageBase
- implements ReplicationPeerStorage {
-
- public TableReplicationPeerStorage(ZKWatcher zookeeper, Configuration conf) throws IOException {
- super(zookeeper, conf);
- }
-
- @Override
- public void addPeer(String peerId, ReplicationPeerConfig peerConfig, boolean enabled)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Put put = new Put(Bytes.toBytes(peerId));
- put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
- ReplicationPeerConfigUtil.toByteArray(peerConfig));
- put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
- enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
- table.put(put);
- } catch (IOException e) {
- throw new ReplicationException("Failed to add peer " + peerId, e);
- }
- }
-
- @Override
- public void removePeer(String peerId) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Delete delete = new Delete(Bytes.toBytes(peerId));
- table.delete(delete);
- } catch (IOException e) {
- throw new ReplicationException("Failed to remove peer " + peerId, e);
- }
- }
-
- // TODO make it to be a checkExistAndMutate operation.
- private boolean peerExist(String peerId, Table table) throws IOException {
- Get get = new Get(Bytes.toBytes(peerId));
- get.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
- return table.exists(get);
- }
-
- @Override
- public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- if (!peerExist(peerId, table)) {
- throw new ReplicationException("Peer " + peerId + " does not exist.");
- }
- Put put = new Put(Bytes.toBytes(peerId));
- put.addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE,
- enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES);
- table.put(put);
- } catch (IOException e) {
- throw new ReplicationException(
- "Failed to set peer state, peerId=" + peerId + ", state=" + enabled, e);
- }
- }
-
- @Override
- public void updatePeerConfig(String peerId, ReplicationPeerConfig peerConfig)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- if (!peerExist(peerId, table)) {
- throw new ReplicationException("Peer " + peerId + " does not exist.");
- }
- Put put = new Put(Bytes.toBytes(peerId));
- put.addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG,
- ReplicationPeerConfigUtil.toByteArray(peerConfig));
- table.put(put);
- } catch (IOException e) {
- throw new ReplicationException("Failed to update peer configuration, peerId=" + peerId, e);
- }
- }
-
- @Override
- public List<String> listPeerIds() throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Scan scan = new Scan().addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
- try (ResultScanner scanner = table.getScanner(scan)) {
- List<String> peerIds = new ArrayList<>();
- for (Result r : scanner) {
- peerIds.add(Bytes.toString(r.getRow()));
- }
- return peerIds;
- }
- } catch (IOException e) {
- throw new ReplicationException("Failed to list peers", e);
- }
- }
-
- @Override
- public boolean isPeerEnabled(String peerId) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_STATE);
- Result r = table.get(get);
- if (r == null) {
- throw new ReplicationException("Peer " + peerId + " does not found");
- }
- return Arrays.equals(PEER_STATE_ENABLED_BYTES, r.getValue(FAMILY_PEER, QUALIFIER_PEER_STATE));
- } catch (IOException e) {
- throw new ReplicationException("Failed to read the peer state, peerId=" + peerId, e);
- }
- }
-
- @Override
- public ReplicationPeerConfig getPeerConfig(String peerId) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Get get = new Get(Bytes.toBytes(peerId)).addColumn(FAMILY_PEER, QUALIFIER_PEER_CONFIG);
- Result r = table.get(get);
- if (r == null) {
- throw new ReplicationException("Peer " + peerId + " does not found");
- }
- byte[] data = r.getValue(FAMILY_PEER, QUALIFIER_PEER_CONFIG);
- if (data == null || data.length == 0) {
- throw new ReplicationException(
- "Replication peer config data shouldn't be empty, peerId=" + peerId);
- }
- try {
- return ReplicationPeerConfigUtil.parsePeerFrom(data);
- } catch (DeserializationException e) {
- throw new ReplicationException(
- "Failed to parse replication peer config for peer with id=" + peerId, e);
- }
- } catch (IOException e) {
- throw new ReplicationException(
- "Failed to read the peer configuration in hbase:replication, peerId=" + peerId, e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
deleted file mode 100644
index abb279d..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationQueueStorage.java
+++ /dev/null
@@ -1,522 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.Cell;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.client.ResultScanner;
-import org.apache.hadoop.hbase.client.Scan;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.util.CollectionUtils;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Table based replication queue storage.
- */
-@InterfaceAudience.Private
-public class TableReplicationQueueStorage extends TableReplicationStorageBase
- implements ReplicationQueueStorage {
-
- private static final Logger LOG = LoggerFactory.getLogger(TableReplicationQueueStorage.class);
-
- public TableReplicationQueueStorage(ZKWatcher zookeeper, Configuration conf) throws IOException {
- super(zookeeper, conf);
- }
-
- /**
- * Serialize the {fileName, position} pair into a byte array.
- */
- private static byte[] makeByteArray(String fileName, long position) {
- byte[] data = new byte[Bytes.SIZEOF_INT + fileName.length() + Bytes.SIZEOF_LONG];
- int pos = 0;
- pos = Bytes.putInt(data, pos, fileName.length());
- pos = Bytes.putBytes(data, pos, Bytes.toBytes(fileName), 0, fileName.length());
- pos = Bytes.putLong(data, pos, position);
- assert pos == data.length;
- return data;
- }
-
- /**
- * Deserialize the byte array into a {filename, position} pair.
- */
- private static Pair<String, Long> parseFileNameAndPosition(byte[] data, int offset)
- throws IOException {
- if (data == null) {
- throw new IOException("The byte array shouldn't be null");
- }
- int pos = offset;
- int len = Bytes.toInt(data, pos, Bytes.SIZEOF_INT);
- pos += Bytes.SIZEOF_INT;
- if (pos + len > data.length) {
- throw new IllegalArgumentException("offset (" + pos + ") + length (" + len + ") exceed the"
- + " capacity of the array: " + data.length);
- }
- String fileName = Bytes.toString(Bytes.copy(data, pos, len));
- pos += len;
- long position = Bytes.toLong(data, pos, Bytes.SIZEOF_LONG);
- return new Pair<>(fileName, position);
- }
-
- @Override
- public void removeQueue(ServerName serverName, String queueId) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Delete delete = new Delete(getServerNameRowKey(serverName));
- delete.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId));
- // Delete all <fileName, position> pairs.
- delete.addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP);
- table.delete(delete);
- } catch (IOException e) {
- throw new ReplicationException(
- "Failed to remove wal from queue, serverName=" + serverName + ", queueId=" + queueId, e);
- }
- }
-
- @Override
- public void addWAL(ServerName serverName, String queueId, String fileName)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Put put = new Put(getServerNameRowKey(serverName));
- put.addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED, HConstants.EMPTY_BYTE_ARRAY);
- put.addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId), HConstants.EMPTY_BYTE_ARRAY);
- put.addColumn(FAMILY_WAL, Bytes.toBytes(queueId), makeByteArray(fileName, 0L));
- table.put(put);
- } catch (IOException e) {
- throw new ReplicationException("Failed to add wal to queue, serverName=" + serverName
- + ", queueId=" + queueId + ", fileName=" + fileName, e);
- }
- }
-
- @Override
- public void removeWAL(ServerName serverName, String queueId, String fileName)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
- .filter(w -> w.fileNameMatch(fileName)).findFirst();
- if (walCell.isPresent()) {
- Delete delete = new Delete(getServerNameRowKey(walCell.get().serverName))
- .addColumn(FAMILY_WAL, Bytes.toBytes(queueId), walCell.get().cellTimestamp);
- table.delete(delete);
- } else {
- LOG.warn(fileName + " has already been deleted when removing log");
- }
- } catch (IOException e) {
- throw new ReplicationException("Failed to remove wal from queue, serverName=" + serverName
- + ", queueId=" + queueId + ", fileName=" + fileName, e);
- }
- }
-
- @Override
- public void setWALPosition(ServerName serverName, String queueId, String fileName, long position,
- Map<String, Long> lastSeqIds) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
- .filter(w -> w.fileNameMatch(fileName)).findFirst();
- if (walCell.isPresent()) {
- List<Put> puts = new ArrayList<>();
- Put put = new Put(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL,
- Bytes.toBytes(walCell.get().queueId), walCell.get().cellTimestamp,
- makeByteArray(fileName, position));
- puts.add(put);
- // Update the last pushed sequence id for each region in a batch.
- String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
- if (lastSeqIds != null && lastSeqIds.size() > 0) {
- for (Map.Entry<String, Long> e : lastSeqIds.entrySet()) {
- Put regionPut = new Put(Bytes.toBytes(peerId)).addColumn(FAMILY_REGIONS,
- getRegionQualifier(e.getKey()), Bytes.toBytes(e.getValue()));
- puts.add(regionPut);
- }
- }
- table.put(puts);
- } else {
- throw new ReplicationException("WAL file " + fileName + " does not found under queue "
- + queueId + " for server " + serverName);
- }
- } catch (IOException e) {
- throw new ReplicationException(
- "Failed to set wal position and last sequence ids, serverName=" + serverName
- + ", queueId=" + queueId + ", fileName=" + fileName + ", position=" + position,
- e);
- }
- }
-
- @Override
- public long getLastSequenceId(String encodedRegionName, String peerId)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Get get = new Get(Bytes.toBytes(peerId));
- get.addColumn(FAMILY_REGIONS, getRegionQualifier(encodedRegionName));
- Result r = table.get(get);
- if (r == null || r.listCells() == null) {
- return HConstants.NO_SEQNUM;
- }
- return Bytes.toLong(r.getValue(FAMILY_REGIONS, getRegionQualifier(encodedRegionName)));
- } catch (IOException e) {
- throw new ReplicationException(
- "Failed to get last sequence id, region=" + encodedRegionName + ", peerId=" + peerId, e);
- }
- }
-
- @Override
- public long getWALPosition(ServerName serverName, String queueId, String fileName)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Optional<WALCell> walCell = getWALsInQueue0(table, serverName, queueId).stream()
- .filter(w -> w.fileNameMatch(fileName)).findFirst();
- if (walCell.isPresent()) {
- return walCell.get().position;
- } else {
- LOG.warn("WAL " + fileName + " does not found under queue " + queueId + " for server "
- + serverName);
- return 0;
- }
- } catch (IOException e) {
- throw new ReplicationException("Failed to get wal position. serverName=" + serverName
- + ", queueId=" + queueId + ", fileName=" + fileName, e);
- }
- }
-
- /**
- * Each cell in column wal:{queueId} will be parsed to a WALCell. The WALCell will be more
- * friendly to upper layer.
- */
- private static final class WALCell {
- ServerName serverName;
- String queueId;
- String wal;
- long position;
- long cellTimestamp;// Timestamp of the cell
-
- private WALCell(ServerName serverName, String queueId, String wal, long position,
- long cellTimestamp) {
- this.serverName = serverName;
- this.queueId = queueId;
- this.wal = wal;
- this.position = position;
- this.cellTimestamp = cellTimestamp;
- }
-
- public static WALCell create(Cell cell) throws IOException {
- ServerName serverName = ServerName.parseServerName(
- Bytes.toString(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()));
- String queueId = Bytes.toString(cell.getQualifierArray(), cell.getQualifierOffset(),
- cell.getQualifierLength());
- Pair<String, Long> fileAndPos =
- parseFileNameAndPosition(cell.getValueArray(), cell.getValueOffset());
- return new WALCell(serverName, queueId, fileAndPos.getFirst(), fileAndPos.getSecond(),
- cell.getTimestamp());
- }
-
- public boolean fileNameMatch(String fileName) {
- return StringUtils.equals(wal, fileName);
- }
- }
-
- /**
- * Parse the WALCell list from a HBase result.
- */
- private List<WALCell> result2WALCells(Result r) throws IOException {
- List<WALCell> wals = new ArrayList<>();
- if (r != null && r.listCells() != null && r.listCells().size() > 0) {
- for (Cell cell : r.listCells()) {
- wals.add(WALCell.create(cell));
- }
- }
- return wals;
- }
-
- /**
- * List all WALs for the specific region server and queueId.
- */
- private List<WALCell> getWALsInQueue0(Table table, ServerName serverName, String queueId)
- throws IOException {
- Get get = new Get(getServerNameRowKey(serverName)).addColumn(FAMILY_WAL, Bytes.toBytes(queueId))
- .readAllVersions();
- return result2WALCells(table.get(get));
- }
-
- @Override
- public List<String> getWALsInQueue(ServerName serverName, String queueId)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- return getWALsInQueue0(table, serverName, queueId).stream().map(p -> p.wal)
- .collect(Collectors.toList());
- } catch (IOException e) {
- throw new ReplicationException(
- "Failed to get wals in queue. serverName=" + serverName + ", queueId=" + queueId, e);
- }
- }
-
- @Override
- public List<String> getAllQueues(ServerName serverName) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- List<String> queues = new ArrayList<>();
- Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_QUEUE);
- Result r = table.get(get);
- if (r != null && r.listCells() != null && r.listCells().size() > 0) {
- for (Cell c : r.listCells()) {
- String queue =
- Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength());
- queues.add(queue);
- }
- }
- return queues;
- } catch (IOException e) {
- throw new ReplicationException("Failed to get all queues. serverName=" + serverName, e);
- }
- }
-
- @Override
- public Pair<String, SortedSet<String>> claimQueue(ServerName sourceServerName, String queueId,
- ServerName destServerName) throws ReplicationException {
- LOG.info(
- "Atomically moving " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
- try (Table table = getReplicationMetaTable()) {
- // Create an enabled region server for destination.
- byte[] destServerNameRowKey = getServerNameRowKey(destServerName);
- byte[] srcServerNameRowKey = getServerNameRowKey(sourceServerName);
- Put put = new Put(destServerNameRowKey).addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED,
- HConstants.EMPTY_BYTE_ARRAY);
- table.put(put);
- List<WALCell> wals = getWALsInQueue0(table, sourceServerName, queueId);
- String newQueueId = queueId + "-" + sourceServerName;
- // Remove the queue in source region server if wal set of the queue is empty.
- if (CollectionUtils.isEmpty(wals)) {
- Delete delete =
- new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId))
- .addColumns(FAMILY_WAL, Bytes.toBytes(queueId), HConstants.LATEST_TIMESTAMP);
- table.delete(delete);
- LOG.info("Removed " + sourceServerName + "/" + queueId + " since it's empty");
- return new Pair<>(newQueueId, Collections.emptySortedSet());
- }
- // Transfer all wals from source region server to destination region server in a batch.
- List<Mutation> mutations = new ArrayList<>();
- // a. Create queue for destination server.
- mutations.add(new Put(destServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(newQueueId),
- HConstants.EMPTY_BYTE_ARRAY));
- SortedSet<String> logQueue = new TreeSet<>();
- for (WALCell wal : wals) {
- byte[] data = makeByteArray(wal.wal, wal.cellTimestamp);
- // b. Add wal to destination server.
- mutations.add(
- new Put(destServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(newQueueId), data));
- // c. Remove wal from source server.
- mutations.add(new Delete(srcServerNameRowKey).addColumn(FAMILY_WAL, Bytes.toBytes(queueId),
- wal.cellTimestamp));
- logQueue.add(wal.wal);
- }
- // d. Remove the queue of source server.
- mutations
- .add(new Delete(srcServerNameRowKey).addColumn(FAMILY_QUEUE, Bytes.toBytes(queueId)));
- Object[] results = new Object[mutations.size()];
- table.batch(mutations, results);
- boolean allSuccess = Stream.of(results).allMatch(r -> r != null);
- if (!allSuccess) {
- throw new ReplicationException("Claim queue queueId=" + queueId + " from "
- + sourceServerName + " to " + destServerName + " failed, not all mutations success.");
- }
- LOG.info(
- "Atomically moved " + sourceServerName + "/" + queueId + "'s WALs to " + destServerName);
- return new Pair<>(newQueueId, logQueue);
- } catch (IOException | InterruptedException e) {
- throw new ReplicationException("Claim queue queueId=" + queueId + " from " + sourceServerName
- + " to " + destServerName + " failed", e);
- }
- }
-
- @Override
- public void removeReplicatorIfQueueIsEmpty(ServerName serverName) throws ReplicationException {
- // TODO Make this to be a checkAndDelete, and provide a UT for it.
- try (Table table = getReplicationMetaTable()) {
- Get get = new Get(getServerNameRowKey(serverName)).addFamily(FAMILY_WAL).readAllVersions();
- Result r = table.get(get);
- if (r == null || r.listCells() == null || r.listCells().size() == 0) {
- Delete delete = new Delete(getServerNameRowKey(serverName));
- table.delete(delete);
- }
- } catch (IOException e) {
- throw new ReplicationException(
- "Failed to remove replicator when queue is empty, serverName=" + serverName, e);
- }
- }
-
- @Override
- public List<ServerName> getListOfReplicators() throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Scan scan = new Scan().addColumn(FAMILY_RS_STATE, QUALIFIER_STATE_ENABLED).readVersions(1);
- Set<ServerName> serverNames = new HashSet<>();
- try (ResultScanner scanner = table.getScanner(scan)) {
- for (Result r : scanner) {
- if (r.listCells().size() > 0) {
- Cell firstCell = r.listCells().get(0);
- String serverName = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(),
- firstCell.getRowLength());
- serverNames.add(ServerName.parseServerName(serverName));
- }
- }
- }
- return new ArrayList<>(serverNames);
- } catch (IOException e) {
- throw new ReplicationException("Failed to get list of replicators", e);
- }
- }
-
- @Override
- public Set<String> getAllWALs() throws ReplicationException {
- Set<String> walSet = new HashSet<>();
- try (Table table = getReplicationMetaTable()) {
- Scan scan = new Scan().addFamily(FAMILY_WAL).readAllVersions();
- try (ResultScanner scanner = table.getScanner(scan)) {
- for (Result r : scanner) {
- result2WALCells(r).forEach(w -> walSet.add(w.wal));
- }
- }
- return walSet;
- } catch (IOException e) {
- throw new ReplicationException("Failed to get all wals", e);
- }
- }
-
- @Override
- public void addPeerToHFileRefs(String peerId) throws ReplicationException {
- // Need to do nothing.
- }
-
- @Override
- public void removePeerFromHFileRefs(String peerId) throws ReplicationException {
- // Need to do nothing.
- }
-
- @Override
- public void addHFileRefs(String peerId, List<Pair<Path, Path>> pairs)
- throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- List<Put> puts = new ArrayList<>();
- for (Pair<Path, Path> p : pairs) {
- Put put = new Put(Bytes.toBytes(peerId));
- put.addColumn(FAMILY_HFILE_REFS, Bytes.toBytes(p.getSecond().getName()),
- HConstants.EMPTY_BYTE_ARRAY);
- puts.add(put);
- }
- table.put(puts);
- } catch (IOException e) {
- throw new ReplicationException("Failed to add hfile refs, peerId=" + peerId, e);
- }
- }
-
- @Override
- public void removeHFileRefs(String peerId, List<String> files) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- List<Delete> deletes = new ArrayList<>();
- for (String file : files) {
- Delete delete = new Delete(Bytes.toBytes(peerId));
- delete.addColumns(FAMILY_HFILE_REFS, Bytes.toBytes(file));
- deletes.add(delete);
- }
- table.delete(deletes);
- } catch (IOException e) {
- throw new ReplicationException("Failed to remove hfile refs, peerId=" + peerId, e);
- }
- }
-
- @Override
- public List<String> getAllPeersFromHFileRefsQueue() throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Set<String> peers = new HashSet<>();
- Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
- try (ResultScanner scanner = table.getScanner(scan)) {
- for (Result r : scanner) {
- if (r.listCells().size() > 0) {
- Cell firstCell = r.listCells().get(0);
- String peerId = Bytes.toString(firstCell.getRowArray(), firstCell.getRowOffset(),
- firstCell.getRowLength());
- peers.add(peerId);
- }
- }
- }
- return new ArrayList<>(peers);
- } catch (IOException e) {
- throw new ReplicationException("Faield to get all peers by reading hbase:replication meta",
- e);
- }
- }
-
- @Override
- public List<String> getReplicableHFiles(String peerId) throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Get get = new Get(Bytes.toBytes(peerId)).addFamily(FAMILY_HFILE_REFS);
- Result r = table.get(get);
- List<String> hfiles = new ArrayList<>();
- if (r != null && r.listCells() != null) {
- for (Cell c : r.listCells()) {
- hfiles.add(
- Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(), c.getQualifierLength()));
- }
- }
- return hfiles;
- } catch (IOException e) {
- throw new ReplicationException("Failed to get replicable hfiles, peerId=" + peerId, e);
- }
- }
-
- @Override
- public Set<String> getAllHFileRefs() throws ReplicationException {
- try (Table table = getReplicationMetaTable()) {
- Scan scan = new Scan().addFamily(FAMILY_HFILE_REFS);
- try (ResultScanner scanner = table.getScanner(scan)) {
- Set<String> hfileSet = new HashSet<>();
- for (Result r : scanner) {
- for (Cell c : r.listCells()) {
- String hfile = Bytes.toString(c.getQualifierArray(), c.getQualifierOffset(),
- c.getQualifierLength());
- hfileSet.add(hfile);
- }
- }
- return hfileSet;
- }
- } catch (IOException e) {
- throw new ReplicationException("Failed to get all hfile refs", e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
deleted file mode 100644
index 40c10d5..0000000
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/TableReplicationStorageBase.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.NamespaceDescriptor;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Table;
-import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
-import org.apache.hadoop.hbase.regionserver.BloomType;
-import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
-
-import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
-
-@InterfaceAudience.Private
-public class TableReplicationStorageBase {
- protected final ZKWatcher zookeeper;
- protected final Configuration conf;
-
- public static final TableName REPLICATION_TABLE =
- TableName.valueOf(NamespaceDescriptor.SYSTEM_NAMESPACE_NAME_STR, "replication");
-
- // Peer family, the row key would be peer id.
- public static final byte[] FAMILY_PEER = Bytes.toBytes("peer");
- public static final byte[] QUALIFIER_PEER_CONFIG = Bytes.toBytes("config");
- public static final byte[] QUALIFIER_PEER_STATE = Bytes.toBytes("state");
-
- // Region server state family, the row key would be name of region server.
- public static final byte[] FAMILY_RS_STATE = Bytes.toBytes("rs_state");
- public static final byte[] QUALIFIER_STATE_ENABLED = Bytes.toBytes("enabled");
-
- // Queue and wal family, the row key would be name of region server.
- public static final byte[] FAMILY_QUEUE = Bytes.toBytes("queue");
- public static final byte[] FAMILY_WAL = Bytes.toBytes("wal");
-
- // HFile-Refs family, the row key would be peer id.
- public static final byte[] FAMILY_HFILE_REFS = Bytes.toBytes("hfile-refs");
-
- // Region family, the row key would be peer id.
- public static final byte[] FAMILY_REGIONS = Bytes.toBytes("regions");
-
- private Connection connection;
-
- protected static byte[] getServerNameRowKey(ServerName serverName) {
- return Bytes.toBytes(serverName.toString());
- }
-
- protected static byte[] getRegionQualifier(String encodedRegionName) {
- return Bytes.toBytes(encodedRegionName);
- }
-
- @VisibleForTesting
- public static TableDescriptorBuilder createReplicationTableDescBuilder(final Configuration conf)
- throws IOException {
- int metaMaxVersion =
- conf.getInt(HConstants.HBASE_META_VERSIONS, HConstants.DEFAULT_HBASE_META_VERSIONS);
- int metaBlockSize =
- conf.getInt(HConstants.HBASE_META_BLOCK_SIZE, HConstants.DEFAULT_HBASE_META_BLOCK_SIZE);
- return TableDescriptorBuilder
- .newBuilder(REPLICATION_TABLE)
- .setColumnFamily(
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_PEER).setMaxVersions(metaMaxVersion)
- .setInMemory(true).setBlocksize(metaBlockSize)
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
- .build())
- .setColumnFamily(
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_RS_STATE).setMaxVersions(metaMaxVersion)
- .setInMemory(true).setBlocksize(metaBlockSize)
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
- .build())
- .setColumnFamily(
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_QUEUE).setMaxVersions(metaMaxVersion)
- .setInMemory(true).setBlocksize(metaBlockSize)
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
- .build())
- .setColumnFamily(
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_WAL)
- .setMaxVersions(HConstants.ALL_VERSIONS).setInMemory(true)
- .setBlocksize(metaBlockSize).setScope(HConstants.REPLICATION_SCOPE_LOCAL)
- .setBloomFilterType(BloomType.NONE).build())
- .setColumnFamily(
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_REGIONS).setMaxVersions(metaMaxVersion)
- .setInMemory(true).setBlocksize(metaBlockSize)
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
- .build())
- .setColumnFamily(
- ColumnFamilyDescriptorBuilder.newBuilder(FAMILY_HFILE_REFS)
- .setMaxVersions(metaMaxVersion).setInMemory(true).setBlocksize(metaBlockSize)
- .setScope(HConstants.REPLICATION_SCOPE_LOCAL).setBloomFilterType(BloomType.NONE)
- .build());
- }
-
- protected TableReplicationStorageBase(ZKWatcher zookeeper, Configuration conf)
- throws IOException {
- this.zookeeper = zookeeper;
- this.conf = conf;
- this.connection = ConnectionFactory.createConnection(conf);
- }
-
- protected Table getReplicationMetaTable() throws IOException {
- return this.connection.getTable(REPLICATION_TABLE);
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
index 138f14a..a53500a 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationPeerStorage.java
@@ -17,9 +17,6 @@
*/
package org.apache.hadoop.hbase.replication;
-import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_DISABLED_BYTES;
-import static org.apache.hadoop.hbase.replication.ReplicationUtils.PEER_STATE_ENABLED_BYTES;
-
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@@ -34,6 +31,8 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.ReplicationProtos;
+
/**
* ZK based replication peer storage.
*/
@@ -47,6 +46,11 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
public static final String PEERS_STATE_ZNODE = "zookeeper.znode.replication.peers.state";
public static final String PEERS_STATE_ZNODE_DEFAULT = "peer-state";
+ public static final byte[] ENABLED_ZNODE_BYTES =
+ toByteArray(ReplicationProtos.ReplicationState.State.ENABLED);
+ public static final byte[] DISABLED_ZNODE_BYTES =
+ toByteArray(ReplicationProtos.ReplicationState.State.DISABLED);
+
/**
* The name of the znode that contains the replication status of a remote slave (i.e. peer)
* cluster.
@@ -85,7 +89,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
ZKUtilOp.createAndFailSilent(getPeerNode(peerId),
ReplicationPeerConfigUtil.toByteArray(peerConfig)),
ZKUtilOp.createAndFailSilent(getPeerStateNode(peerId),
- enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES)),
+ enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES)),
false);
} catch (KeeperException e) {
throw new ReplicationException("Could not add peer with id=" + peerId + ", peerConfif=>"
@@ -104,7 +108,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
@Override
public void setPeerState(String peerId, boolean enabled) throws ReplicationException {
- byte[] stateBytes = enabled ? PEER_STATE_ENABLED_BYTES : PEER_STATE_DISABLED_BYTES;
+ byte[] stateBytes = enabled ? ENABLED_ZNODE_BYTES : DISABLED_ZNODE_BYTES;
try {
ZKUtil.setData(zookeeper, getPeerStateNode(peerId), stateBytes);
} catch (KeeperException e) {
@@ -136,7 +140,7 @@ public class ZKReplicationPeerStorage extends ZKReplicationStorageBase
@Override
public boolean isPeerEnabled(String peerId) throws ReplicationException {
try {
- return Arrays.equals(PEER_STATE_ENABLED_BYTES,
+ return Arrays.equals(ENABLED_ZNODE_BYTES,
ZKUtil.getData(zookeeper, getPeerStateNode(peerId)));
} catch (KeeperException | InterruptedException e) {
throw new ReplicationException("Unable to get status of the peer with id=" + peerId, e);
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
index 4a1e780..72caf82 100644
--- a/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
+++ b/hbase-replication/src/main/java/org/apache/hadoop/hbase/replication/ZKReplicationQueueStorage.java
@@ -79,7 +79,7 @@ import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesti
* </pre>
*/
@InterfaceAudience.Private
-public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
+class ZKReplicationQueueStorage extends ZKReplicationStorageBase
implements ReplicationQueueStorage {
private static final Logger LOG = LoggerFactory.getLogger(ZKReplicationQueueStorage.class);
@@ -199,7 +199,7 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
// Persist the max sequence id(s) of regions for serial replication atomically.
if (lastSeqIds != null && lastSeqIds.size() > 0) {
for (Entry<String, Long> lastSeqEntry : lastSeqIds.entrySet()) {
- String peerId = ReplicationUtils.parsePeerIdFromQueueId(queueId);
+ String peerId = new ReplicationQueueInfo(queueId).getPeerId();
String path = getSerialReplicationRegionPeerNode(lastSeqEntry.getKey(), peerId);
/*
* Make sure the existence of path
@@ -375,7 +375,7 @@ public class ZKReplicationQueueStorage extends ZKReplicationStorageBase
// will be overridden in UTs
@VisibleForTesting
- public int getQueuesZNodeCversion() throws KeeperException {
+ protected int getQueuesZNodeCversion() throws KeeperException {
Stat stat = new Stat();
ZKUtil.getDataNoWatch(this.zookeeper, this.queuesZNode, stat);
return stat.getCversion();
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
new file mode 100644
index 0000000..5999c1f
--- /dev/null
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -0,0 +1,363 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.replication.ReplicationPeer.PeerState;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.zookeeper.KeeperException;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableMap;
+
+/**
+ * White box testing for replication state interfaces. Implementations should extend this class, and
+ * initialize the interfaces properly.
+ */
+public abstract class TestReplicationStateBasic {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
+
+ protected ReplicationQueueStorage rqs;
+ protected ServerName server1 = ServerName.valueOf("hostname1.example.org", 1234, 12345);
+ protected ServerName server2 = ServerName.valueOf("hostname2.example.org", 1234, 12345);
+ protected ServerName server3 = ServerName.valueOf("hostname3.example.org", 1234, 12345);
+ protected ReplicationPeers rp;
+ protected static final String ID_ONE = "1";
+ protected static final String ID_TWO = "2";
+ protected static String KEY_ONE;
+ protected static String KEY_TWO;
+
+ // For testing when we try to replicate to ourself
+ protected String OUR_KEY;
+
+ protected static int zkTimeoutCount;
+ protected static final int ZK_MAX_COUNT = 300;
+ protected static final int ZK_SLEEP_INTERVAL = 100; // millis
+
+ @Test
+ public void testReplicationQueueStorage() throws ReplicationException {
+ // Test methods with empty state
+ assertEquals(0, rqs.getListOfReplicators().size());
+ assertTrue(rqs.getWALsInQueue(server1, "qId1").isEmpty());
+ assertTrue(rqs.getAllQueues(server1).isEmpty());
+
+ /*
+ * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
+ * server2: zero queues
+ */
+ rqs.addWAL(server1, "qId1", "trash");
+ rqs.removeWAL(server1, "qId1", "trash");
+ rqs.addWAL(server1,"qId2", "filename1");
+ rqs.addWAL(server1,"qId3", "filename2");
+ rqs.addWAL(server1,"qId3", "filename3");
+ rqs.addWAL(server2,"trash", "trash");
+ rqs.removeQueue(server2,"trash");
+
+ List<ServerName> reps = rqs.getListOfReplicators();
+ assertEquals(2, reps.size());
+ assertTrue(server1.getServerName(), reps.contains(server1));
+ assertTrue(server2.getServerName(), reps.contains(server2));
+
+ assertTrue(rqs.getWALsInQueue(ServerName.valueOf("bogus", 12345, 12345), "bogus").isEmpty());
+ assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
+ assertEquals(0, rqs.getWALsInQueue(server1, "qId1").size());
+ assertEquals(1, rqs.getWALsInQueue(server1, "qId2").size());
+ assertEquals("filename1", rqs.getWALsInQueue(server1, "qId2").get(0));
+
+ assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 12345, -1L)).isEmpty());
+ assertEquals(0, rqs.getAllQueues(server2).size());
+ List<String> list = rqs.getAllQueues(server1);
+ assertEquals(3, list.size());
+ assertTrue(list.contains("qId2"));
+ assertTrue(list.contains("qId3"));
+ }
+
+ private void removeAllQueues(ServerName serverName) throws ReplicationException {
+ for (String queue: rqs.getAllQueues(serverName)) {
+ rqs.removeQueue(serverName, queue);
+ }
+ }
+ @Test
+ public void testReplicationQueues() throws ReplicationException {
+ // Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
+ rp.init();
+
+ rqs.removeQueue(server1, "bogus");
+ rqs.removeWAL(server1, "bogus", "bogus");
+ removeAllQueues(server1);
+ assertEquals(0, rqs.getAllQueues(server1).size());
+ assertEquals(0, rqs.getWALPosition(server1, "bogus", "bogus"));
+ assertTrue(rqs.getWALsInQueue(server1, "bogus").isEmpty());
+ assertTrue(rqs.getAllQueues(ServerName.valueOf("bogus", 1234, 12345)).isEmpty());
+
+ populateQueues();
+
+ assertEquals(3, rqs.getListOfReplicators().size());
+ assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
+ assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
+ assertEquals(0, rqs.getWALPosition(server3, "qId1", "filename0"));
+ rqs.setWALPosition(server3, "qId5", "filename4", 354L, null);
+ assertEquals(354L, rqs.getWALPosition(server3, "qId5", "filename4"));
+
+ assertEquals(5, rqs.getWALsInQueue(server3, "qId5").size());
+ assertEquals(0, rqs.getWALsInQueue(server2, "qId1").size());
+ assertEquals(0, rqs.getAllQueues(server1).size());
+ assertEquals(1, rqs.getAllQueues(server2).size());
+ assertEquals(5, rqs.getAllQueues(server3).size());
+
+ assertEquals(0, rqs.getAllQueues(server1).size());
+ rqs.removeReplicatorIfQueueIsEmpty(server1);
+ assertEquals(2, rqs.getListOfReplicators().size());
+
+ List<String> queues = rqs.getAllQueues(server3);
+ assertEquals(5, queues.size());
+ for (String queue : queues) {
+ rqs.claimQueue(server3, queue, server2);
+ }
+ rqs.removeReplicatorIfQueueIsEmpty(server3);
+ assertEquals(1, rqs.getListOfReplicators().size());
+
+ assertEquals(6, rqs.getAllQueues(server2).size());
+ removeAllQueues(server2);
+ rqs.removeReplicatorIfQueueIsEmpty(server2);
+ assertEquals(0, rqs.getListOfReplicators().size());
+ }
+
+ @Test
+ public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
+ rp.init();
+
+ List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+ files1.add(new Pair<>(null, new Path("file_1")));
+ files1.add(new Pair<>(null, new Path("file_2")));
+ files1.add(new Pair<>(null, new Path("file_3")));
+ assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
+ assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
+ rp.getPeerStorage().addPeer(ID_ONE,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
+ rqs.addPeerToHFileRefs(ID_ONE);
+ rqs.addHFileRefs(ID_ONE, files1);
+ assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
+ List<String> hfiles2 = new ArrayList<>(files1.size());
+ for (Pair<Path, Path> p : files1) {
+ hfiles2.add(p.getSecond().getName());
+ }
+ String removedString = hfiles2.remove(0);
+ rqs.removeHFileRefs(ID_ONE, hfiles2);
+ assertEquals(1, rqs.getReplicableHFiles(ID_ONE).size());
+ hfiles2 = new ArrayList<>(1);
+ hfiles2.add(removedString);
+ rqs.removeHFileRefs(ID_ONE, hfiles2);
+ assertEquals(0, rqs.getReplicableHFiles(ID_ONE).size());
+ rp.getPeerStorage().removePeer(ID_ONE);
+ }
+
+ @Test
+ public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
+ rp.init();
+ rp.getPeerStorage().addPeer(ID_ONE,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_ONE).build(), true);
+ rqs.addPeerToHFileRefs(ID_ONE);
+ rp.getPeerStorage().addPeer(ID_TWO,
+ ReplicationPeerConfig.newBuilder().setClusterKey(KEY_TWO).build(), true);
+ rqs.addPeerToHFileRefs(ID_TWO);
+
+ List<Pair<Path, Path>> files1 = new ArrayList<>(3);
+ files1.add(new Pair<>(null, new Path("file_1")));
+ files1.add(new Pair<>(null, new Path("file_2")));
+ files1.add(new Pair<>(null, new Path("file_3")));
+ rqs.addHFileRefs(ID_ONE, files1);
+ rqs.addHFileRefs(ID_TWO, files1);
+ assertEquals(2, rqs.getAllPeersFromHFileRefsQueue().size());
+ assertEquals(3, rqs.getReplicableHFiles(ID_ONE).size());
+ assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
+
+ rp.getPeerStorage().removePeer(ID_ONE);
+ rqs.removePeerFromHFileRefs(ID_ONE);
+ assertEquals(1, rqs.getAllPeersFromHFileRefsQueue().size());
+ assertTrue(rqs.getReplicableHFiles(ID_ONE).isEmpty());
+ assertEquals(3, rqs.getReplicableHFiles(ID_TWO).size());
+
+ rp.getPeerStorage().removePeer(ID_TWO);
+ rqs.removePeerFromHFileRefs(ID_TWO);
+ assertEquals(0, rqs.getAllPeersFromHFileRefsQueue().size());
+ assertTrue(rqs.getReplicableHFiles(ID_TWO).isEmpty());
+ }
+
+ @Test
+ public void testReplicationPeers() throws Exception {
+ rp.init();
+
+ try {
+ rp.getPeerStorage().setPeerState("bogus", true);
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (ReplicationException e) {
+ }
+ try {
+ rp.getPeerStorage().setPeerState("bogus", false);
+ fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
+ } catch (ReplicationException e) {
+ }
+
+ try {
+ assertFalse(rp.addPeer("bogus"));
+ fail("Should have thrown an ReplicationException when passed a bogus peerId");
+ } catch (ReplicationException e) {
+ }
+
+ assertNumberOfPeers(0);
+
+ // Add some peers
+ rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+ assertNumberOfPeers(1);
+ rp.getPeerStorage().addPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO), true);
+ assertNumberOfPeers(2);
+
+ assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(ReplicationUtils
+ .getPeerClusterConfiguration(rp.getPeerStorage().getPeerConfig(ID_ONE), rp.getConf())));
+ rp.getPeerStorage().removePeer(ID_ONE);
+ rp.removePeer(ID_ONE);
+ assertNumberOfPeers(1);
+
+ // Add one peer
+ rp.getPeerStorage().addPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE), true);
+ rp.addPeer(ID_ONE);
+ assertNumberOfPeers(2);
+ assertTrue(rp.getPeer(ID_ONE).isPeerEnabled());
+ rp.getPeerStorage().setPeerState(ID_ONE, false);
+ // now we do not rely on zk watcher to trigger the state change so we need to trigger it
+ // manually...
+ ReplicationPeerImpl peer = rp.getPeer(ID_ONE);
+ rp.refreshPeerState(peer.getId());
+ assertEquals(PeerState.DISABLED, peer.getPeerState());
+ assertConnectedPeerStatus(false, ID_ONE);
+ rp.getPeerStorage().setPeerState(ID_ONE, true);
+ // now we do not rely on zk watcher to trigger the state change so we need to trigger it
+ // manually...
+ rp.refreshPeerState(peer.getId());
+ assertEquals(PeerState.ENABLED, peer.getPeerState());
+ assertConnectedPeerStatus(true, ID_ONE);
+
+ // Disconnect peer
+ rp.removePeer(ID_ONE);
+ assertNumberOfPeers(2);
+ }
+
+ private String getFileName(String base, int i) {
+ return String.format(base + "-%04d", i);
+ }
+
+ @Test
+ public void testPersistLogPositionAndSeqIdAtomically() throws Exception {
+ ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+ assertTrue(rqs.getAllQueues(serverName1).isEmpty());
+ String queue1 = "1";
+ String region0 = "region0", region1 = "region1";
+ for (int i = 0; i < 10; i++) {
+ rqs.addWAL(serverName1, queue1, getFileName("file1", i));
+ }
+ List<String> queueIds = rqs.getAllQueues(serverName1);
+ assertEquals(1, queueIds.size());
+ assertThat(queueIds, hasItems("1"));
+
+ List<String> wals1 = rqs.getWALsInQueue(serverName1, queue1);
+ assertEquals(10, wals1.size());
+ for (int i = 0; i < 10; i++) {
+ assertThat(wals1, hasItems(getFileName("file1", i)));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(0, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
+ }
+ assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region0, queue1));
+ assertEquals(HConstants.NO_SEQNUM, rqs.getLastSequenceId(region1, queue1));
+
+ for (int i = 0; i < 10; i++) {
+ rqs.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100,
+ ImmutableMap.of(region0, i * 100L, region1, (i + 1) * 100L));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals((i + 1) * 100, rqs.getWALPosition(serverName1, queue1, getFileName("file1", i)));
+ }
+ assertEquals(900L, rqs.getLastSequenceId(region0, queue1));
+ assertEquals(1000L, rqs.getLastSequenceId(region1, queue1));
+ }
+
+ protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
+ // we can first check if the value was changed in the store, if it wasn't then fail right away
+ if (status != rp.getPeerStorage().isPeerEnabled(peerId)) {
+ fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
+ }
+ while (true) {
+ if (status == rp.getPeer(peerId).isPeerEnabled()) {
+ return;
+ }
+ if (zkTimeoutCount < ZK_MAX_COUNT) {
+ LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
+ + ", sleeping and trying again.");
+ Thread.sleep(ZK_SLEEP_INTERVAL);
+ } else {
+ fail("Timed out waiting for ConnectedPeerStatus to be " + status);
+ }
+ }
+ }
+
+ protected void assertNumberOfPeers(int total) throws ReplicationException {
+ assertEquals(total, rp.getPeerStorage().listPeerIds().size());
+ }
+
+ /*
+ * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
+ * 3, 4, 5 log files respectively
+ */
+ protected void populateQueues() throws ReplicationException {
+ rqs.addWAL(server1, "trash", "trash");
+ rqs.removeQueue(server1, "trash");
+
+ rqs.addWAL(server2, "qId1", "trash");
+ rqs.removeWAL(server2, "qId1", "trash");
+
+ for (int i = 1; i < 6; i++) {
+ for (int j = 0; j < i; j++) {
+ rqs.addWAL(server3, "qId" + i, "filename" + j);
+ }
+ // Add peers for the corresponding queues so they are not orphans
+ rp.getPeerStorage().addPeer("qId" + i,
+ ReplicationPeerConfig.newBuilder().setClusterKey("localhost:2818:/bogus" + i).build(),
+ true);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
new file mode 100644
index 0000000..08178f4
--- /dev/null
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKConfig;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestReplicationStateZKImpl.class);
+
+ private static Configuration conf;
+ private static HBaseZKTestingUtility utility;
+ private static ZKWatcher zkw;
+ private static String replicationZNode;
+
+ @BeforeClass
+ public static void setUpBeforeClass() throws Exception {
+ utility = new HBaseZKTestingUtility();
+ utility.startMiniZKCluster();
+ conf = utility.getConfiguration();
+ conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
+ zkw = utility.getZooKeeperWatcher();
+ String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
+ replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
+ KEY_ONE = initPeerClusterState("/hbase1");
+ KEY_TWO = initPeerClusterState("/hbase2");
+ }
+
+ private static String initPeerClusterState(String baseZKNode)
+ throws IOException, KeeperException {
+ // Add a dummy region server and set up the cluster id
+ Configuration testConf = new Configuration(conf);
+ testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
+ ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
+ String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234");
+ ZKUtil.createWithParents(zkw1, fakeRs);
+ ZKClusterId.setClusterId(zkw1, new ClusterId());
+ return ZKConfig.getZooKeeperClusterKey(testConf);
+ }
+
+ @Before
+ public void setUp() {
+ zkTimeoutCount = 0;
+ rqs = ReplicationStorageFactory.getReplicationQueueStorage(zkw, conf);
+ rp = ReplicationFactory.getReplicationPeers(zkw, conf);
+ OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
+ }
+
+ @After
+ public void tearDown() throws KeeperException, IOException {
+ ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ utility.shutdownMiniZKCluster();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
new file mode 100644
index 0000000..3290fb0
--- /dev/null
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationPeerStorage.java
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static java.util.stream.Collectors.toList;
+import static java.util.stream.Collectors.toSet;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.stream.Stream;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationPeerStorage {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestZKReplicationPeerStorage.class);
+
+ private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
+
+ private static ZKReplicationPeerStorage STORAGE;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniZKCluster();
+ STORAGE = new ZKReplicationPeerStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ private Set<String> randNamespaces(Random rand) {
+ return Stream.generate(() -> Long.toHexString(rand.nextLong())).limit(rand.nextInt(5))
+ .collect(toSet());
+ }
+
+ private Map<TableName, List<String>> randTableCFs(Random rand) {
+ int size = rand.nextInt(5);
+ Map<TableName, List<String>> map = new HashMap<>();
+ for (int i = 0; i < size; i++) {
+ TableName tn = TableName.valueOf(Long.toHexString(rand.nextLong()));
+ List<String> cfs = Stream.generate(() -> Long.toHexString(rand.nextLong()))
+ .limit(rand.nextInt(5)).collect(toList());
+ map.put(tn, cfs);
+ }
+ return map;
+ }
+
+ private ReplicationPeerConfig getConfig(int seed) {
+ Random rand = new Random(seed);
+ return ReplicationPeerConfig.newBuilder().setClusterKey(Long.toHexString(rand.nextLong()))
+ .setReplicationEndpointImpl(Long.toHexString(rand.nextLong()))
+ .setNamespaces(randNamespaces(rand)).setExcludeNamespaces(randNamespaces(rand))
+ .setTableCFsMap(randTableCFs(rand)).setReplicateAllUserTables(rand.nextBoolean())
+ .setBandwidth(rand.nextInt(1000)).build();
+ }
+
+ private void assertSetEquals(Set<String> expected, Set<String> actual) {
+ if (expected == null || expected.size() == 0) {
+ assertTrue(actual == null || actual.size() == 0);
+ return;
+ }
+ assertEquals(expected.size(), actual.size());
+ expected.forEach(s -> assertTrue(actual.contains(s)));
+ }
+
+ private void assertMapEquals(Map<TableName, List<String>> expected,
+ Map<TableName, List<String>> actual) {
+ if (expected == null || expected.size() == 0) {
+ assertTrue(actual == null || actual.size() == 0);
+ return;
+ }
+ assertEquals(expected.size(), actual.size());
+ expected.forEach((expectedTn, expectedCFs) -> {
+ List<String> actualCFs = actual.get(expectedTn);
+ if (expectedCFs == null || expectedCFs.size() == 0) {
+ assertTrue(actual.containsKey(expectedTn));
+ assertTrue(actualCFs == null || actualCFs.size() == 0);
+ } else {
+ assertNotNull(actualCFs);
+ assertEquals(expectedCFs.size(), actualCFs.size());
+ for (Iterator<String> expectedIt = expectedCFs.iterator(), actualIt = actualCFs.iterator();
+ expectedIt.hasNext();) {
+ assertEquals(expectedIt.next(), actualIt.next());
+ }
+ }
+ });
+ }
+
+ private void assertConfigEquals(ReplicationPeerConfig expected, ReplicationPeerConfig actual) {
+ assertEquals(expected.getClusterKey(), actual.getClusterKey());
+ assertEquals(expected.getReplicationEndpointImpl(), actual.getReplicationEndpointImpl());
+ assertSetEquals(expected.getNamespaces(), actual.getNamespaces());
+ assertSetEquals(expected.getExcludeNamespaces(), actual.getExcludeNamespaces());
+ assertMapEquals(expected.getTableCFsMap(), actual.getTableCFsMap());
+ assertMapEquals(expected.getExcludeTableCFsMap(), actual.getExcludeTableCFsMap());
+ assertEquals(expected.replicateAllUserTables(), actual.replicateAllUserTables());
+ assertEquals(expected.getBandwidth(), actual.getBandwidth());
+ }
+
+ @Test
+ public void test() throws ReplicationException {
+ int peerCount = 10;
+ for (int i = 0; i < peerCount; i++) {
+ STORAGE.addPeer(Integer.toString(i), getConfig(i), i % 2 == 0);
+ }
+ List<String> peerIds = STORAGE.listPeerIds();
+ assertEquals(peerCount, peerIds.size());
+ for (String peerId : peerIds) {
+ int seed = Integer.parseInt(peerId);
+ assertConfigEquals(getConfig(seed), STORAGE.getPeerConfig(peerId));
+ }
+ for (int i = 0; i < peerCount; i++) {
+ STORAGE.updatePeerConfig(Integer.toString(i), getConfig(i + 1));
+ }
+ for (String peerId : peerIds) {
+ int seed = Integer.parseInt(peerId);
+ assertConfigEquals(getConfig(seed + 1), STORAGE.getPeerConfig(peerId));
+ }
+ for (int i = 0; i < peerCount; i++) {
+ assertEquals(i % 2 == 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+ }
+ for (int i = 0; i < peerCount; i++) {
+ STORAGE.setPeerState(Integer.toString(i), i % 2 != 0);
+ }
+ for (int i = 0; i < peerCount; i++) {
+ assertEquals(i % 2 != 0, STORAGE.isPeerEnabled(Integer.toString(i)));
+ }
+ String toRemove = Integer.toString(peerCount / 2);
+ STORAGE.removePeer(toRemove);
+ peerIds = STORAGE.listPeerIds();
+ assertEquals(peerCount - 1, peerIds.size());
+ assertFalse(peerIds.contains(toRemove));
+
+ try {
+ STORAGE.getPeerConfig(toRemove);
+ fail("Should throw a ReplicationException when get peer config of a peerId");
+ } catch (ReplicationException e) {
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hbase/blob/00095a2e/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
----------------------------------------------------------------------
diff --git a/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
new file mode 100644
index 0000000..8ff52f3
--- /dev/null
+++ b/hbase-replication/src/test/java/org/apache/hadoop/hbase/replication/TestZKReplicationQueueStorage.java
@@ -0,0 +1,252 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import static org.hamcrest.CoreMatchers.hasItems;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Set;
+import java.util.SortedSet;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseZKTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.ReplicationTests;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.zookeeper.KeeperException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ ReplicationTests.class, MediumTests.class })
+public class TestZKReplicationQueueStorage {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestZKReplicationQueueStorage.class);
+
+ private static final HBaseZKTestingUtility UTIL = new HBaseZKTestingUtility();
+
+ private static ZKReplicationQueueStorage STORAGE;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ UTIL.startMiniZKCluster();
+ STORAGE = new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration());
+ }
+
+ @AfterClass
+ public static void tearDown() throws IOException {
+ UTIL.shutdownMiniZKCluster();
+ }
+
+ @After
+ public void tearDownAfterTest() throws ReplicationException {
+ for (ServerName serverName : STORAGE.getListOfReplicators()) {
+ for (String queue : STORAGE.getAllQueues(serverName)) {
+ STORAGE.removeQueue(serverName, queue);
+ }
+ STORAGE.removeReplicatorIfQueueIsEmpty(serverName);
+ }
+ for (String peerId : STORAGE.getAllPeersFromHFileRefsQueue()) {
+ STORAGE.removePeerFromHFileRefs(peerId);
+ }
+ }
+
+ private ServerName getServerName(int i) {
+ return ServerName.valueOf("127.0.0.1", 8000 + i, 10000 + i);
+ }
+
+ @Test
+ public void testReplicator() throws ReplicationException {
+ assertTrue(STORAGE.getListOfReplicators().isEmpty());
+ String queueId = "1";
+ for (int i = 0; i < 10; i++) {
+ STORAGE.addWAL(getServerName(i), queueId, "file" + i);
+ }
+ List<ServerName> replicators = STORAGE.getListOfReplicators();
+ assertEquals(10, replicators.size());
+ for (int i = 0; i < 10; i++) {
+ assertThat(replicators, hasItems(getServerName(i)));
+ }
+ for (int i = 0; i < 5; i++) {
+ STORAGE.removeQueue(getServerName(i), queueId);
+ }
+ for (int i = 0; i < 10; i++) {
+ STORAGE.removeReplicatorIfQueueIsEmpty(getServerName(i));
+ }
+ replicators = STORAGE.getListOfReplicators();
+ assertEquals(5, replicators.size());
+ for (int i = 5; i < 10; i++) {
+ assertThat(replicators, hasItems(getServerName(i)));
+ }
+ }
+
+ private String getFileName(String base, int i) {
+ return String.format(base + "-%04d", i);
+ }
+
+ @Test
+ public void testAddRemoveLog() throws ReplicationException {
+ ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+ assertTrue(STORAGE.getAllQueues(serverName1).isEmpty());
+ String queue1 = "1";
+ String queue2 = "2";
+ for (int i = 0; i < 10; i++) {
+ STORAGE.addWAL(serverName1, queue1, getFileName("file1", i));
+ STORAGE.addWAL(serverName1, queue2, getFileName("file2", i));
+ }
+ List<String> queueIds = STORAGE.getAllQueues(serverName1);
+ assertEquals(2, queueIds.size());
+ assertThat(queueIds, hasItems("1", "2"));
+
+ List<String> wals1 = STORAGE.getWALsInQueue(serverName1, queue1);
+ List<String> wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
+ assertEquals(10, wals1.size());
+ assertEquals(10, wals2.size());
+ for (int i = 0; i < 10; i++) {
+ assertThat(wals1, hasItems(getFileName("file1", i)));
+ assertThat(wals2, hasItems(getFileName("file2", i)));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals(0, STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
+ assertEquals(0, STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
+ STORAGE.setWALPosition(serverName1, queue1, getFileName("file1", i), (i + 1) * 100, null);
+ STORAGE.setWALPosition(serverName1, queue2, getFileName("file2", i), (i + 1) * 100 + 10,
+ null);
+ }
+
+ for (int i = 0; i < 10; i++) {
+ assertEquals((i + 1) * 100,
+ STORAGE.getWALPosition(serverName1, queue1, getFileName("file1", i)));
+ assertEquals((i + 1) * 100 + 10,
+ STORAGE.getWALPosition(serverName1, queue2, getFileName("file2", i)));
+ }
+
+ for (int i = 0; i < 10; i++) {
+ if (i % 2 == 0) {
+ STORAGE.removeWAL(serverName1, queue1, getFileName("file1", i));
+ } else {
+ STORAGE.removeWAL(serverName1, queue2, getFileName("file2", i));
+ }
+ }
+
+ queueIds = STORAGE.getAllQueues(serverName1);
+ assertEquals(2, queueIds.size());
+ assertThat(queueIds, hasItems("1", "2"));
+
+ ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
+ Pair<String, SortedSet<String>> peer1 = STORAGE.claimQueue(serverName1, "1", serverName2);
+
+ assertEquals("1-" + serverName1.getServerName(), peer1.getFirst());
+ assertEquals(5, peer1.getSecond().size());
+ int i = 1;
+ for (String wal : peer1.getSecond()) {
+ assertEquals(getFileName("file1", i), wal);
+ assertEquals((i + 1) * 100,
+ STORAGE.getWALPosition(serverName2, peer1.getFirst(), getFileName("file1", i)));
+ i += 2;
+ }
+
+ queueIds = STORAGE.getAllQueues(serverName1);
+ assertEquals(1, queueIds.size());
+ assertThat(queueIds, hasItems("2"));
+ wals2 = STORAGE.getWALsInQueue(serverName1, queue2);
+ assertEquals(5, wals2.size());
+ for (i = 0; i < 10; i += 2) {
+ assertThat(wals2, hasItems(getFileName("file2", i)));
+ }
+
+ queueIds = STORAGE.getAllQueues(serverName2);
+ assertEquals(1, queueIds.size());
+ assertThat(queueIds, hasItems(peer1.getFirst()));
+ wals1 = STORAGE.getWALsInQueue(serverName2, peer1.getFirst());
+ assertEquals(5, wals1.size());
+ for (i = 1; i < 10; i += 2) {
+ assertThat(wals1, hasItems(getFileName("file1", i)));
+ }
+
+ Set<String> allWals = STORAGE.getAllWALs();
+ assertEquals(10, allWals.size());
+ for (i = 0; i < 10; i++) {
+ assertThat(allWals, hasItems(i % 2 == 0 ? getFileName("file2", i) : getFileName("file1", i)));
+ }
+ }
+
+ // For HBASE-12865
+ @Test
+ public void testClaimQueueChangeCversion() throws ReplicationException, KeeperException {
+ ServerName serverName1 = ServerName.valueOf("127.0.0.1", 8000, 10000);
+ STORAGE.addWAL(serverName1, "1", "file");
+
+ int v0 = STORAGE.getQueuesZNodeCversion();
+ ServerName serverName2 = ServerName.valueOf("127.0.0.1", 8001, 10001);
+ STORAGE.claimQueue(serverName1, "1", serverName2);
+ int v1 = STORAGE.getQueuesZNodeCversion();
+ // cversion should increase by 1 since a child node is deleted
+ assertEquals(1, v1 - v0);
+ }
+
+ private ZKReplicationQueueStorage createWithUnstableCversion() throws IOException {
+ return new ZKReplicationQueueStorage(UTIL.getZooKeeperWatcher(), UTIL.getConfiguration()) {
+
+ private int called = 0;
+
+ @Override
+ protected int getQueuesZNodeCversion() throws KeeperException {
+ if (called < 4) {
+ called++;
+ }
+ return called;
+ }
+ };
+ }
+
+ @Test
+ public void testGetAllWALsCversionChange() throws IOException, ReplicationException {
+ ZKReplicationQueueStorage storage = createWithUnstableCversion();
+ storage.addWAL(getServerName(0), "1", "file");
+ // This should return eventually when cversion stabilizes
+ Set<String> allWals = storage.getAllWALs();
+ assertEquals(1, allWals.size());
+ assertThat(allWals, hasItems("file"));
+ }
+
+ // For HBASE-14621
+ @Test
+ public void testGetAllHFileRefsCversionChange() throws IOException, ReplicationException {
+ ZKReplicationQueueStorage storage = createWithUnstableCversion();
+ storage.addPeerToHFileRefs("1");
+ Path p = new Path("/test");
+ storage.addHFileRefs("1", Arrays.asList(Pair.newPair(p, p)));
+ // This should return eventually when cversion stabilizes
+ Set<String> allHFileRefs = storage.getAllHFileRefs();
+ assertEquals(1, allHFileRefs.size());
+ assertThat(allHFileRefs, hasItems("test"));
+ }
+}