You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by zh...@apache.org on 2018/01/04 08:19:59 UTC
[25/38] hbase git commit: HBASE-19599 Remove ReplicationQueuesClient,
use ReplicationQueueStorage directly
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
index 6e27a21..d8f9625 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/DumpReplicationQueues.java
@@ -21,13 +21,13 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Set;
import java.util.stream.Collectors;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileStatus;
@@ -48,17 +48,18 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
+import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
-import org.apache.zookeeper.KeeperException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.AtomicLongMap;
/**
@@ -303,57 +304,53 @@ public class DumpReplicationQueues extends Configured implements Tool {
}
public String dumpQueues(ClusterConnection connection, ZKWatcher zkw, Set<String> peerIds,
- boolean hdfs) throws Exception {
- ReplicationQueuesClient queuesClient;
+ boolean hdfs) throws Exception {
+ ReplicationQueueStorage queueStorage;
ReplicationPeers replicationPeers;
ReplicationQueues replicationQueues;
ReplicationTracker replicationTracker;
- ReplicationQueuesClientArguments replicationArgs =
- new ReplicationQueuesClientArguments(getConf(), new WarnOnlyAbortable(), zkw);
+ ReplicationQueuesArguments replicationArgs =
+ new ReplicationQueuesArguments(getConf(), new WarnOnlyAbortable(), zkw);
StringBuilder sb = new StringBuilder();
- queuesClient = ReplicationFactory.getReplicationQueuesClient(replicationArgs);
- queuesClient.init();
+ queueStorage = ReplicationStorageFactory.getReplicationQueueStorage(zkw, getConf());
replicationQueues = ReplicationFactory.getReplicationQueues(replicationArgs);
- replicationPeers = ReplicationFactory.getReplicationPeers(zkw, getConf(), queuesClient, connection);
+ replicationPeers =
+ ReplicationFactory.getReplicationPeers(zkw, getConf(), queueStorage, connection);
replicationTracker = ReplicationFactory.getReplicationTracker(zkw, replicationPeers, getConf(),
new WarnOnlyAbortable(), new WarnOnlyStoppable());
- List<String> liveRegionServers = replicationTracker.getListOfRegionServers();
+ Set<String> liveRegionServers = new HashSet<>(replicationTracker.getListOfRegionServers());
// Loops each peer on each RS and dumps the queues
- try {
- List<String> regionservers = queuesClient.getListOfReplicators();
- if (regionservers == null || regionservers.isEmpty()) {
- return sb.toString();
+ List<ServerName> regionservers = queueStorage.getListOfReplicators();
+ if (regionservers == null || regionservers.isEmpty()) {
+ return sb.toString();
+ }
+ for (ServerName regionserver : regionservers) {
+ List<String> queueIds = queueStorage.getAllQueues(regionserver);
+ replicationQueues.init(regionserver.getServerName());
+ if (!liveRegionServers.contains(regionserver.getServerName())) {
+ deadRegionServers.add(regionserver.getServerName());
}
- for (String regionserver : regionservers) {
- List<String> queueIds = queuesClient.getAllQueues(regionserver);
- replicationQueues.init(regionserver);
- if (!liveRegionServers.contains(regionserver)) {
- deadRegionServers.add(regionserver);
- }
- for (String queueId : queueIds) {
- ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
- List<String> wals = queuesClient.getLogsInQueue(regionserver, queueId);
- if (!peerIds.contains(queueInfo.getPeerId())) {
- deletedQueues.add(regionserver + "/" + queueId);
- sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true,
- hdfs));
- } else {
- sb.append(formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false,
- hdfs));
- }
+ for (String queueId : queueIds) {
+ ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
+ List<String> wals = queueStorage.getWALsInQueue(regionserver, queueId);
+ if (!peerIds.contains(queueInfo.getPeerId())) {
+ deletedQueues.add(regionserver + "/" + queueId);
+ sb.append(
+ formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, true, hdfs));
+ } else {
+ sb.append(
+ formatQueue(regionserver, replicationQueues, queueInfo, queueId, wals, false, hdfs));
}
}
- } catch (KeeperException ke) {
- throw new IOException(ke);
}
return sb.toString();
}
- private String formatQueue(String regionserver, ReplicationQueues replicationQueues, ReplicationQueueInfo queueInfo,
- String queueId, List<String> wals, boolean isDeleted, boolean hdfs) throws Exception {
-
+ private String formatQueue(ServerName regionserver, ReplicationQueues replicationQueues,
+ ReplicationQueueInfo queueInfo, String queueId, List<String> wals, boolean isDeleted,
+ boolean hdfs) throws Exception {
StringBuilder sb = new StringBuilder();
List<ServerName> deadServers;
@@ -389,13 +386,14 @@ public class DumpReplicationQueues extends Configured implements Tool {
/**
* return total size in bytes from a list of WALs
*/
- private long getTotalWALSize(FileSystem fs, List<String> wals, String server) throws IOException {
+ private long getTotalWALSize(FileSystem fs, List<String> wals, ServerName server)
+ throws IOException {
long size = 0;
FileStatus fileStatus;
for (String wal : wals) {
try {
- fileStatus = (new WALLink(getConf(), server, wal)).getFileStatus(fs);
+ fileStatus = (new WALLink(getConf(), server.getServerName(), wal)).getFileStatus(fs);
} catch (IOException e) {
if (e instanceof FileNotFoundException) {
numWalsNotFound++;
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
index 839b5ad..85fa729 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/hbck/ReplicationChecker.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.util.hbck;
import java.io.IOException;
@@ -27,22 +26,23 @@ import java.util.Map.Entry;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.master.cleaner.ReplicationZKNodeCleaner;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.util.HBaseFsck;
import org.apache.hadoop.hbase.util.HBaseFsck.ErrorReporter;
+import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
+import org.apache.yetus.audience.InterfaceAudience;
-/*
+/**
* Check and fix undeleted replication queues for removed peerId.
*/
@InterfaceAudience.Private
public class ReplicationChecker {
private final ErrorReporter errorReporter;
// replicator with its queueIds for removed peers
- private Map<String, List<String>> undeletedQueueIds = new HashMap<>();
+ private Map<ServerName, List<String>> undeletedQueueIds = new HashMap<>();
// replicator with its undeleted queueIds for removed peers in hfile-refs queue
private Set<String> undeletedHFileRefsQueueIds = new HashSet<>();
private final ReplicationZKNodeCleaner cleaner;
@@ -60,8 +60,8 @@ public class ReplicationChecker {
public void checkUnDeletedQueues() throws IOException {
undeletedQueueIds = cleaner.getUnDeletedQueues();
- for (Entry<String, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
- String replicator = replicatorAndQueueIds.getKey();
+ for (Entry<ServerName, List<String>> replicatorAndQueueIds : undeletedQueueIds.entrySet()) {
+ ServerName replicator = replicatorAndQueueIds.getKey();
for (String queueId : replicatorAndQueueIds.getValue()) {
ReplicationQueueInfo queueInfo = new ReplicationQueueInfo(queueId);
String msg = "Undeleted replication queue for removed peer found: "
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
index 28a7562..b28eaaf 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/TestAsyncReplicationAdminApi.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -34,12 +35,16 @@ import java.util.Set;
import java.util.concurrent.CompletionException;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.replication.ReplicationException;
import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeerDescription;
+import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
+import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
@@ -56,8 +61,8 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
private final String ID_ONE = "1";
private final String KEY_ONE = "127.0.0.1:2181:/hbase";
- private final String ID_SECOND = "2";
- private final String KEY_SECOND = "127.0.0.1:2181:/hbase2";
+ private final String ID_TWO = "2";
+ private final String KEY_TWO = "127.0.0.1:2181:/hbase2";
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -65,21 +70,27 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setInt(START_LOG_ERRORS_AFTER_COUNT_KEY, 0);
+ TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
TEST_UTIL.startMiniCluster();
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}
@After
- public void cleanupPeer() {
+ public void clearPeerAndQueues() throws IOException, ReplicationException {
try {
admin.removeReplicationPeer(ID_ONE).join();
} catch (Exception e) {
- LOG.debug("Replication peer " + ID_ONE + " may already be removed");
}
try {
- admin.removeReplicationPeer(ID_SECOND).join();
+ admin.removeReplicationPeer(ID_TWO).join();
} catch (Exception e) {
- LOG.debug("Replication peer " + ID_SECOND + " may already be removed");
+ }
+ ReplicationQueueStorage queueStorage = ReplicationStorageFactory
+ .getReplicationQueueStorage(TEST_UTIL.getZooKeeperWatcher(), TEST_UTIL.getConfiguration());
+ for (ServerName serverName : queueStorage.getListOfReplicators()) {
+ for (String queue : queueStorage.getAllQueues(serverName)) {
+ queueStorage.removeQueue(serverName, queue);
+ }
}
}
@@ -88,7 +99,7 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
ReplicationPeerConfig rpc1 = new ReplicationPeerConfig();
rpc1.setClusterKey(KEY_ONE);
ReplicationPeerConfig rpc2 = new ReplicationPeerConfig();
- rpc2.setClusterKey(KEY_SECOND);
+ rpc2.setClusterKey(KEY_TWO);
// Add a valid peer
admin.addReplicationPeer(ID_ONE, rpc1).join();
// try adding the same (fails)
@@ -101,19 +112,19 @@ public class TestAsyncReplicationAdminApi extends TestAsyncAdminBase {
assertEquals(1, admin.listReplicationPeers().get().size());
// Try to remove an inexisting peer
try {
- admin.removeReplicationPeer(ID_SECOND).join();
+ admin.removeReplicationPeer(ID_TWO).join();
fail("Test case should fail as removing a inexisting peer.");
} catch (CompletionException e) {
// OK!
}
assertEquals(1, admin.listReplicationPeers().get().size());
// Add a second since multi-slave is supported
- admin.addReplicationPeer(ID_SECOND, rpc2).join();
+ admin.addReplicationPeer(ID_TWO, rpc2).join();
assertEquals(2, admin.listReplicationPeers().get().size());
// Remove the first peer we added
admin.removeReplicationPeer(ID_ONE).join();
assertEquals(1, admin.listReplicationPeers().get().size());
- admin.removeReplicationPeer(ID_SECOND).join();
+ admin.removeReplicationPeer(ID_TWO).join();
assertEquals(0, admin.listReplicationPeers().get().size());
}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
index c2fcd8c..8bb3230 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/client/replication/TestReplicationAdmin.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueueStorage;
import org.apache.hadoop.hbase.replication.ReplicationStorageFactory;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@@ -82,6 +83,7 @@ public class TestReplicationAdmin {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ TEST_UTIL.getConfiguration().setInt(ReadOnlyZKClient.RECOVERY_RETRY, 1);
TEST_UTIL.startMiniCluster();
admin = new ReplicationAdmin(TEST_UTIL.getConfiguration());
hbaseAdmin = TEST_UTIL.getAdmin();
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
index 08b27ec..1e75959 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestLogsCleaner.java
@@ -1,4 +1,4 @@
-/*
+/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
@@ -24,16 +24,12 @@ import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
import java.io.IOException;
-import java.lang.reflect.Field;
import java.net.URLEncoder;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
import java.util.Random;
-
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -51,7 +47,6 @@ import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationLogCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
import org.apache.hadoop.hbase.testclassification.MasterTests;
@@ -65,10 +60,11 @@ import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
@Category({MasterTests.class, MediumTests.class})
public class TestLogsCleaner {
@@ -195,24 +191,6 @@ public class TestLogsCleaner {
}
}
- @Test(timeout=5000)
- public void testZnodeCversionChange() throws Exception {
- Configuration conf = TEST_UTIL.getConfiguration();
- ReplicationLogCleaner cleaner = new ReplicationLogCleaner();
- cleaner.setConf(conf);
-
- ReplicationQueuesClientZKImpl rqcMock = Mockito.mock(ReplicationQueuesClientZKImpl.class);
- Mockito.when(rqcMock.getQueuesZNodeCversion()).thenReturn(1, 2, 3, 4);
-
- Field rqc = ReplicationLogCleaner.class.getDeclaredField("replicationQueues");
- rqc.setAccessible(true);
-
- rqc.set(cleaner, rqcMock);
-
- // This should return eventually when cversion stabilizes
- cleaner.getDeletableFiles(new LinkedList<>());
- }
-
/**
* ReplicationLogCleaner should be able to ride over ZooKeeper errors without aborting.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
index 2948701..f83695f 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationHFileCleaner.java
@@ -1,12 +1,19 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license
- * agreements. See the NOTICE file distributed with this work for additional information regarding
- * copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the License. You may obtain a
- * copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable
- * law or agreed to in writing, software distributed under the License is distributed on an "AS IS"
- * BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License
- * for the specific language governing permissions and limitations under the License.
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
*/
package org.apache.hadoop.hbase.master.cleaner;
@@ -17,14 +24,10 @@ import static org.junit.Assert.fail;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
-import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
-
import java.io.IOException;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -45,7 +48,6 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl;
import org.apache.hadoop.hbase.replication.master.ReplicationHFileCleaner;
import org.apache.hadoop.hbase.replication.regionserver.Replication;
@@ -63,10 +65,11 @@ import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.mockito.Mockito;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
+
@Category({ MasterTests.class, SmallTests.class })
public class TestReplicationHFileCleaner {
private static final Logger LOG = LoggerFactory.getLogger(ReplicationQueuesZKImpl.class);
@@ -188,32 +191,6 @@ public class TestReplicationHFileCleaner {
assertTrue(deletableFilesIterator.next().getPath().equals(deletablefile));
}
- /*
- * Test for HBASE-14621. This test will not assert directly anything. Without the fix the test
- * will end up in a infinite loop, so it will timeout.
- */
- @Test(timeout = 15000)
- public void testForDifferntHFileRefsZnodeVersion() throws Exception {
- // 1. Create a file
- Path file = new Path(root, "testForDifferntHFileRefsZnodeVersion");
- fs.createNewFile(file);
- // 2. Assert file is successfully created
- assertTrue("Test file not created!", fs.exists(file));
- ReplicationHFileCleaner cleaner = new ReplicationHFileCleaner();
- cleaner.setConf(conf);
-
- ReplicationQueuesClient replicationQueuesClient = Mockito.mock(ReplicationQueuesClient.class);
- //Return different znode version for each call
- Mockito.when(replicationQueuesClient.getHFileRefsNodeChangeVersion()).thenReturn(1, 2);
-
- Class<? extends ReplicationHFileCleaner> cleanerClass = cleaner.getClass();
- Field rqc = cleanerClass.getDeclaredField("rqc");
- rqc.setAccessible(true);
- rqc.set(cleaner, replicationQueuesClient);
-
- cleaner.isFileDeletable(fs.getFileStatus(file));
- }
-
/**
* ReplicationHFileCleaner should be able to ride over ZooKeeper errors without aborting.
*/
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
index 6aa59cb..8178266 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/cleaner/TestReplicationZKNodeCleaner.java
@@ -15,7 +15,6 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.hadoop.hbase.master.cleaner;
import static org.junit.Assert.assertEquals;
@@ -26,6 +25,7 @@ import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
@@ -43,9 +43,9 @@ public class TestReplicationZKNodeCleaner {
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private final String ID_ONE = "1";
- private final String SERVER_ONE = "server1";
+ private final ServerName SERVER_ONE = ServerName.valueOf("server1", 8000, 1234);
private final String ID_TWO = "2";
- private final String SERVER_TWO = "server2";
+ private final ServerName SERVER_TWO = ServerName.valueOf("server2", 8000, 1234);
private final Configuration conf;
private final ZKWatcher zkw;
@@ -72,12 +72,12 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleaner() throws Exception {
- repQueues.init(SERVER_ONE);
+ repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
repQueues.addLog(ID_ONE, "file1");
ReplicationZKNodeCleaner cleaner = new ReplicationZKNodeCleaner(conf, zkw, null);
- Map<String, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
+ Map<ServerName, List<String>> undeletedQueues = cleaner.getUnDeletedQueues();
assertEquals(1, undeletedQueues.size());
assertTrue(undeletedQueues.containsKey(SERVER_ONE));
assertEquals(1, undeletedQueues.get(SERVER_ONE).size());
@@ -100,7 +100,7 @@ public class TestReplicationZKNodeCleaner {
@Test
public void testReplicationZKNodeCleanerChore() throws Exception {
- repQueues.init(SERVER_ONE);
+ repQueues.init(SERVER_ONE.getServerName());
// add queue for ID_ONE which isn't exist
repQueues.addLog(ID_ONE, "file1");
// add a recovery queue for ID_TWO which isn't exist
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
deleted file mode 100644
index 29c0930..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ /dev/null
@@ -1,378 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.*;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.util.Pair;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.zookeeper.KeeperException;
-import org.junit.Before;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * White box testing for replication state interfaces. Implementations should extend this class, and
- * initialize the interfaces properly.
- */
-public abstract class TestReplicationStateBasic {
-
- protected ReplicationQueues rq1;
- protected ReplicationQueues rq2;
- protected ReplicationQueues rq3;
- protected ReplicationQueuesClient rqc;
- protected String server1 = ServerName.valueOf("hostname1.example.org", 1234, -1L).toString();
- protected String server2 = ServerName.valueOf("hostname2.example.org", 1234, -1L).toString();
- protected String server3 = ServerName.valueOf("hostname3.example.org", 1234, -1L).toString();
- protected ReplicationPeers rp;
- protected static final String ID_ONE = "1";
- protected static final String ID_TWO = "2";
- protected static String KEY_ONE;
- protected static String KEY_TWO;
-
- // For testing when we try to replicate to ourself
- protected String OUR_ID = "3";
- protected String OUR_KEY;
-
- protected static int zkTimeoutCount;
- protected static final int ZK_MAX_COUNT = 300;
- protected static final int ZK_SLEEP_INTERVAL = 100; // millis
-
- private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateBasic.class);
-
- @Before
- public void setUp() {
- zkTimeoutCount = 0;
- }
-
- @Test
- public void testReplicationQueuesClient() throws ReplicationException, KeeperException {
- rqc.init();
- // Test methods with empty state
- assertEquals(0, rqc.getListOfReplicators().size());
- assertNull(rqc.getLogsInQueue(server1, "qId1"));
- assertNull(rqc.getAllQueues(server1));
-
- /*
- * Set up data Two replicators: -- server1: three queues with 0, 1 and 2 log files each --
- * server2: zero queues
- */
- rq1.init(server1);
- rq2.init(server2);
- rq1.addLog("qId1", "trash");
- rq1.removeLog("qId1", "trash");
- rq1.addLog("qId2", "filename1");
- rq1.addLog("qId3", "filename2");
- rq1.addLog("qId3", "filename3");
- rq2.addLog("trash", "trash");
- rq2.removeQueue("trash");
-
- List<String> reps = rqc.getListOfReplicators();
- assertEquals(2, reps.size());
- assertTrue(server1, reps.contains(server1));
- assertTrue(server2, reps.contains(server2));
-
- assertNull(rqc.getLogsInQueue("bogus", "bogus"));
- assertNull(rqc.getLogsInQueue(server1, "bogus"));
- assertEquals(0, rqc.getLogsInQueue(server1, "qId1").size());
- assertEquals(1, rqc.getLogsInQueue(server1, "qId2").size());
- assertEquals("filename1", rqc.getLogsInQueue(server1, "qId2").get(0));
-
- assertNull(rqc.getAllQueues("bogus"));
- assertEquals(0, rqc.getAllQueues(server2).size());
- List<String> list = rqc.getAllQueues(server1);
- assertEquals(3, list.size());
- assertTrue(list.contains("qId2"));
- assertTrue(list.contains("qId3"));
- }
-
- @Test
- public void testReplicationQueues() throws ReplicationException {
- rq1.init(server1);
- rq2.init(server2);
- rq3.init(server3);
- //Initialize ReplicationPeer so we can add peers (we don't transfer lone queues)
- rp.init();
-
- // 3 replicators should exist
- assertEquals(3, rq1.getListOfReplicators().size());
- rq1.removeQueue("bogus");
- rq1.removeLog("bogus", "bogus");
- rq1.removeAllQueues();
- assertEquals(0, rq1.getAllQueues().size());
- assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
- assertNull(rq1.getLogsInQueue("bogus"));
- assertNull(rq1.getUnClaimedQueueIds(
- ServerName.valueOf("bogus", 1234, -1L).toString()));
-
- rq1.setLogPosition("bogus", "bogus", 5L);
-
- populateQueues();
-
- assertEquals(3, rq1.getListOfReplicators().size());
- assertEquals(0, rq2.getLogsInQueue("qId1").size());
- assertEquals(5, rq3.getLogsInQueue("qId5").size());
- assertEquals(0, rq3.getLogPosition("qId1", "filename0"));
- rq3.setLogPosition("qId5", "filename4", 354L);
- assertEquals(354L, rq3.getLogPosition("qId5", "filename4"));
-
- assertEquals(5, rq3.getLogsInQueue("qId5").size());
- assertEquals(0, rq2.getLogsInQueue("qId1").size());
- assertEquals(0, rq1.getAllQueues().size());
- assertEquals(1, rq2.getAllQueues().size());
- assertEquals(5, rq3.getAllQueues().size());
-
- assertEquals(0, rq3.getUnClaimedQueueIds(server1).size());
- rq3.removeReplicatorIfQueueIsEmpty(server1);
- assertEquals(2, rq3.getListOfReplicators().size());
-
- List<String> queues = rq2.getUnClaimedQueueIds(server3);
- assertEquals(5, queues.size());
- for(String queue: queues) {
- rq2.claimQueue(server3, queue);
- }
- rq2.removeReplicatorIfQueueIsEmpty(server3);
- assertEquals(1, rq2.getListOfReplicators().size());
-
- // Try to claim our own queues
- assertNull(rq2.getUnClaimedQueueIds(server2));
- rq2.removeReplicatorIfQueueIsEmpty(server2);
-
- assertEquals(6, rq2.getAllQueues().size());
-
- rq2.removeAllQueues();
-
- assertEquals(0, rq2.getListOfReplicators().size());
- }
-
- @Test
- public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
- rp.init();
-
- try {
- rp.registerPeer(ID_ONE,
- new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:hbase"));
- fail("Should throw an IllegalArgumentException because "
- + "zookeeper.znode.parent is missing leading '/'.");
- } catch (IllegalArgumentException e) {
- // Expected.
- }
-
- try {
- rp.registerPeer(ID_ONE,
- new ReplicationPeerConfig().setClusterKey("hostname1.example.org:1234:/"));
- fail("Should throw an IllegalArgumentException because zookeeper.znode.parent is missing.");
- } catch (IllegalArgumentException e) {
- // Expected.
- }
-
- try {
- rp.registerPeer(ID_ONE,
- new ReplicationPeerConfig().setClusterKey("hostname1.example.org::/hbase"));
- fail("Should throw an IllegalArgumentException because "
- + "hbase.zookeeper.property.clientPort is missing.");
- } catch (IllegalArgumentException e) {
- // Expected.
- }
- }
-
- @Test
- public void testHfileRefsReplicationQueues() throws ReplicationException, KeeperException {
- rp.init();
- rq1.init(server1);
- rqc.init();
-
- List<Pair<Path, Path>> files1 = new ArrayList<>(3);
- files1.add(new Pair<>(null, new Path("file_1")));
- files1.add(new Pair<>(null, new Path("file_2")));
- files1.add(new Pair<>(null, new Path("file_3")));
- assertNull(rqc.getReplicableHFiles(ID_ONE));
- assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
- rq1.addPeerToHFileRefs(ID_ONE);
- rq1.addHFileRefs(ID_ONE, files1);
- assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
- assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
- List<String> hfiles2 = new ArrayList<>(files1.size());
- for (Pair<Path, Path> p : files1) {
- hfiles2.add(p.getSecond().getName());
- }
- String removedString = hfiles2.remove(0);
- rq1.removeHFileRefs(ID_ONE, hfiles2);
- assertEquals(1, rqc.getReplicableHFiles(ID_ONE).size());
- hfiles2 = new ArrayList<>(1);
- hfiles2.add(removedString);
- rq1.removeHFileRefs(ID_ONE, hfiles2);
- assertEquals(0, rqc.getReplicableHFiles(ID_ONE).size());
- rp.unregisterPeer(ID_ONE);
- }
-
- @Test
- public void testRemovePeerForHFileRefs() throws ReplicationException, KeeperException {
- rq1.init(server1);
- rqc.init();
-
- rp.init();
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
- rq1.addPeerToHFileRefs(ID_ONE);
- rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
- rq1.addPeerToHFileRefs(ID_TWO);
-
- List<Pair<Path, Path>> files1 = new ArrayList<>(3);
- files1.add(new Pair<>(null, new Path("file_1")));
- files1.add(new Pair<>(null, new Path("file_2")));
- files1.add(new Pair<>(null, new Path("file_3")));
- rq1.addHFileRefs(ID_ONE, files1);
- rq1.addHFileRefs(ID_TWO, files1);
- assertEquals(2, rqc.getAllPeersFromHFileRefsQueue().size());
- assertEquals(3, rqc.getReplicableHFiles(ID_ONE).size());
- assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
-
- rp.unregisterPeer(ID_ONE);
- rq1.removePeerFromHFileRefs(ID_ONE);
- assertEquals(1, rqc.getAllPeersFromHFileRefsQueue().size());
- assertNull(rqc.getReplicableHFiles(ID_ONE));
- assertEquals(3, rqc.getReplicableHFiles(ID_TWO).size());
-
- rp.unregisterPeer(ID_TWO);
- rq1.removePeerFromHFileRefs(ID_TWO);
- assertEquals(0, rqc.getAllPeersFromHFileRefsQueue().size());
- assertNull(rqc.getReplicableHFiles(ID_TWO));
- }
-
- @Test
- public void testReplicationPeers() throws Exception {
- rp.init();
-
- // Test methods with non-existent peer ids
- try {
- rp.unregisterPeer("bogus");
- fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
- } catch (IllegalArgumentException e) {
- }
- try {
- rp.enablePeer("bogus");
- fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
- } catch (IllegalArgumentException e) {
- }
- try {
- rp.disablePeer("bogus");
- fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
- } catch (IllegalArgumentException e) {
- }
- try {
- rp.getStatusOfPeer("bogus");
- fail("Should have thrown an IllegalArgumentException when passed a bogus peerId");
- } catch (IllegalArgumentException e) {
- }
- assertFalse(rp.peerConnected("bogus"));
- rp.peerDisconnected("bogus");
-
- assertNull(rp.getPeerConf("bogus"));
- assertNumberOfPeers(0);
-
- // Add some peers
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
- assertNumberOfPeers(1);
- rp.registerPeer(ID_TWO, new ReplicationPeerConfig().setClusterKey(KEY_TWO));
- assertNumberOfPeers(2);
-
- // Test methods with a peer that is added but not connected
- try {
- rp.getStatusOfPeer(ID_ONE);
- fail("There are no connected peers, should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException e) {
- }
- assertEquals(KEY_ONE, ZKConfig.getZooKeeperClusterKey(rp.getPeerConf(ID_ONE).getSecond()));
- rp.unregisterPeer(ID_ONE);
- rp.peerDisconnected(ID_ONE);
- assertNumberOfPeers(1);
-
- // Add one peer
- rp.registerPeer(ID_ONE, new ReplicationPeerConfig().setClusterKey(KEY_ONE));
- rp.peerConnected(ID_ONE);
- assertNumberOfPeers(2);
- assertTrue(rp.getStatusOfPeer(ID_ONE));
- rp.disablePeer(ID_ONE);
- assertConnectedPeerStatus(false, ID_ONE);
- rp.enablePeer(ID_ONE);
- assertConnectedPeerStatus(true, ID_ONE);
-
- // Disconnect peer
- rp.peerDisconnected(ID_ONE);
- assertNumberOfPeers(2);
- try {
- rp.getStatusOfPeer(ID_ONE);
- fail("There are no connected peers, should have thrown an IllegalArgumentException");
- } catch (IllegalArgumentException e) {
- }
- }
-
- protected void assertConnectedPeerStatus(boolean status, String peerId) throws Exception {
- // we can first check if the value was changed in the store, if it wasn't then fail right away
- if (status != rp.getStatusOfPeerFromBackingStore(peerId)) {
- fail("ConnectedPeerStatus was " + !status + " but expected " + status + " in ZK");
- }
- while (true) {
- if (status == rp.getStatusOfPeer(peerId)) {
- return;
- }
- if (zkTimeoutCount < ZK_MAX_COUNT) {
- LOG.debug("ConnectedPeerStatus was " + !status + " but expected " + status
- + ", sleeping and trying again.");
- Thread.sleep(ZK_SLEEP_INTERVAL);
- } else {
- fail("Timed out waiting for ConnectedPeerStatus to be " + status);
- }
- }
- }
-
- protected void assertNumberOfPeers(int total) {
- assertEquals(total, rp.getAllPeerConfigs().size());
- assertEquals(total, rp.getAllPeerIds().size());
- assertEquals(total, rp.getAllPeerIds().size());
- }
-
- /*
- * three replicators: rq1 has 0 queues, rq2 has 1 queue with no logs, rq3 has 5 queues with 1, 2,
- * 3, 4, 5 log files respectively
- */
- protected void populateQueues() throws ReplicationException {
- rq1.addLog("trash", "trash");
- rq1.removeQueue("trash");
-
- rq2.addLog("qId1", "trash");
- rq2.removeLog("qId1", "trash");
-
- for (int i = 1; i < 6; i++) {
- for (int j = 0; j < i; j++) {
- rq3.addLog("qId" + i, "filename" + j);
- }
- //Add peers for the corresponding queues so they are not orphans
- rp.registerPeer("qId" + i, new ReplicationPeerConfig().setClusterKey("localhost:2818:/bogus" + i));
- }
- }
-}
-
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
deleted file mode 100644
index 231d655..0000000
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ /dev/null
@@ -1,227 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hbase.replication;
-
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.hbase.ChoreService;
-import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
-import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
-import org.apache.hadoop.hbase.client.ClusterConnection;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
-import org.apache.hadoop.hbase.testclassification.ReplicationTests;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
-import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
-import org.apache.hadoop.hbase.zookeeper.ZKConfig;
-import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
-import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
-import org.apache.zookeeper.KeeperException;
-import org.junit.After;
-import org.junit.AfterClass;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.junit.experimental.categories.Category;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Category({ReplicationTests.class, MediumTests.class})
-public class TestReplicationStateZKImpl extends TestReplicationStateBasic {
-
- private static final Logger LOG = LoggerFactory.getLogger(TestReplicationStateZKImpl.class);
-
- private static Configuration conf;
- private static HBaseTestingUtility utility;
- private static ZKWatcher zkw;
- private static String replicationZNode;
- private ReplicationQueuesZKImpl rqZK;
-
- @BeforeClass
- public static void setUpBeforeClass() throws Exception {
- utility = new HBaseTestingUtility();
- utility.startMiniZKCluster();
- conf = utility.getConfiguration();
- conf.setBoolean(HConstants.REPLICATION_BULKLOAD_ENABLE_KEY, true);
- zkw = HBaseTestingUtility.getZooKeeperWatcher(utility);
- String replicationZNodeName = conf.get("zookeeper.znode.replication", "replication");
- replicationZNode = ZNodePaths.joinZNode(zkw.znodePaths.baseZNode, replicationZNodeName);
- KEY_ONE = initPeerClusterState("/hbase1");
- KEY_TWO = initPeerClusterState("/hbase2");
- }
-
- private static String initPeerClusterState(String baseZKNode)
- throws IOException, KeeperException {
- // Add a dummy region server and set up the cluster id
- Configuration testConf = new Configuration(conf);
- testConf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, baseZKNode);
- ZKWatcher zkw1 = new ZKWatcher(testConf, "test1", null);
- String fakeRs = ZNodePaths.joinZNode(zkw1.znodePaths.rsZNode, "hostname1.example.org:1234");
- ZKUtil.createWithParents(zkw1, fakeRs);
- ZKClusterId.setClusterId(zkw1, new ClusterId());
- return ZKConfig.getZooKeeperClusterKey(testConf);
- }
-
- @Before
- @Override
- public void setUp() {
- super.setUp();
- DummyServer ds1 = new DummyServer(server1);
- DummyServer ds2 = new DummyServer(server2);
- DummyServer ds3 = new DummyServer(server3);
- try {
- rq1 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds1, zkw));
- rq2 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds2, zkw));
- rq3 = ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, ds3, zkw));
- rqc = ReplicationFactory.getReplicationQueuesClient(
- new ReplicationQueuesClientArguments(conf, ds1, zkw));
- } catch (Exception e) {
- // This should not occur, because getReplicationQueues() only throws for
- // TableBasedReplicationQueuesImpl
- fail("ReplicationFactory.getReplicationQueues() threw an IO Exception");
- }
- rp = ReplicationFactory.getReplicationPeers(zkw, conf, zkw);
- OUR_KEY = ZKConfig.getZooKeeperClusterKey(conf);
- rqZK = new ReplicationQueuesZKImpl(zkw, conf, ds1);
- }
-
- @After
- public void tearDown() throws KeeperException, IOException {
- ZKUtil.deleteNodeRecursively(zkw, replicationZNode);
- }
-
- @AfterClass
- public static void tearDownAfterClass() throws Exception {
- utility.shutdownMiniZKCluster();
- }
-
- @Test
- public void testIsPeerPath_PathToParentOfPeerNode() {
- assertFalse(rqZK.isPeerPath(rqZK.peersZNode));
- }
-
- @Test
- public void testIsPeerPath_PathToChildOfPeerNode() {
- String peerChild = ZNodePaths.joinZNode(ZNodePaths.joinZNode(rqZK.peersZNode, "1"), "child");
- assertFalse(rqZK.isPeerPath(peerChild));
- }
-
- @Test
- public void testIsPeerPath_ActualPeerPath() {
- String peerPath = ZNodePaths.joinZNode(rqZK.peersZNode, "1");
- assertTrue(rqZK.isPeerPath(peerPath));
- }
-
- static class DummyServer implements Server {
- private String serverName;
- private boolean isAborted = false;
- private boolean isStopped = false;
-
- public DummyServer(String serverName) {
- this.serverName = serverName;
- }
-
- @Override
- public Configuration getConfiguration() {
- return conf;
- }
-
- @Override
- public ZKWatcher getZooKeeper() {
- return zkw;
- }
-
- @Override
- public CoordinatedStateManager getCoordinatedStateManager() {
- return null;
- }
-
- @Override
- public ClusterConnection getConnection() {
- return null;
- }
-
- @Override
- public MetaTableLocator getMetaTableLocator() {
- return null;
- }
-
- @Override
- public ServerName getServerName() {
- return ServerName.valueOf(this.serverName);
- }
-
- @Override
- public void abort(String why, Throwable e) {
- LOG.info("Aborting " + serverName);
- this.isAborted = true;
- }
-
- @Override
- public boolean isAborted() {
- return this.isAborted;
- }
-
- @Override
- public void stop(String why) {
- this.isStopped = true;
- }
-
- @Override
- public boolean isStopped() {
- return this.isStopped;
- }
-
- @Override
- public ChoreService getChoreService() {
- return null;
- }
-
- @Override
- public ClusterConnection getClusterConnection() {
- // TODO Auto-generated method stub
- return null;
- }
-
- @Override
- public FileSystem getFileSystem() {
- return null;
- }
-
- @Override
- public boolean isStopping() {
- return false;
- }
-
- @Override
- public Connection createConnection(Configuration conf) throws IOException {
- return null;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/hbase/blob/a7c2428b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
index b47a8d3..aeab8b0 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManagerZkImpl.java
@@ -1,34 +1,34 @@
-/*
-*
-* Licensed to the Apache Software Foundation (ASF) under one
-* or more contributor license agreements. See the NOTICE file
-* distributed with this work for additional information
-* regarding copyright ownership. The ASF licenses this file
-* to you under the Apache License, Version 2.0 (the
-* "License"); you may not use this file except in compliance
-* with the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.hadoop.hbase.replication.regionserver;
+import static org.junit.Assert.assertTrue;
+
+import java.util.List;
+
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
-import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.replication.ReplicationFactory;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
import org.apache.hadoop.hbase.replication.ReplicationQueuesArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientArguments;
-import org.apache.hadoop.hbase.replication.ReplicationQueuesClientZKImpl;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.testclassification.ReplicationTests;
@@ -36,11 +36,6 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
/**
* Tests the ReplicationSourceManager with ReplicationQueueZkImpl's and
* ReplicationQueuesClientZkImpl. Also includes extra tests outside of those in
@@ -114,41 +109,4 @@ public class TestReplicationSourceManagerZkImpl extends TestReplicationSourceMan
server.stop("");
}
-
- @Test
- public void testFailoverDeadServerCversionChange() throws Exception {
- final Server s0 = new DummyServer("cversion-change0.example.org");
- ReplicationQueues repQueues =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(conf, s0,
- s0.getZooKeeper()));
- repQueues.init(s0.getServerName().toString());
- // populate some znodes in the peer znode
- files.add("log1");
- files.add("log2");
- for (String file : files) {
- repQueues.addLog("1", file);
- }
- // simulate queue transfer
- Server s1 = new DummyServer("cversion-change1.example.org");
- ReplicationQueues rq1 =
- ReplicationFactory.getReplicationQueues(new ReplicationQueuesArguments(s1.getConfiguration(), s1,
- s1.getZooKeeper()));
- rq1.init(s1.getServerName().toString());
-
- ReplicationQueuesClientZKImpl client =
- (ReplicationQueuesClientZKImpl)ReplicationFactory.getReplicationQueuesClient(
- new ReplicationQueuesClientArguments(s1.getConfiguration(), s1, s1.getZooKeeper()));
-
- int v0 = client.getQueuesZNodeCversion();
- List<String> queues = rq1.getUnClaimedQueueIds(s0.getServerName().getServerName());
- for(String queue : queues) {
- rq1.claimQueue(s0.getServerName().getServerName(), queue);
- }
- rq1.removeReplicatorIfQueueIsEmpty(s0.getServerName().getServerName());
- int v1 = client.getQueuesZNodeCversion();
- // cversion should increase by 1 since a child node is deleted
- assertEquals(v0 + 1, v1);
-
- s0.stop("");
- }
}