You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2021/02/19 18:16:09 UTC
[hbase] branch branch-1 updated: HBASE-25583: Handle the running
replication source gracefully with replication nodes deleted (#2960)
This is an automated email from the ASF dual-hosted git repository.
apurtell pushed a commit to branch branch-1
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-1 by this push:
new f9a9148 HBASE-25583: Handle the running replication source gracefully with replication nodes deleted (#2960)
f9a9148 is described below
commit f9a91488b2c39320bed502619bf7adb765c79de6
Author: Sandeep Pal <50...@users.noreply.github.com>
AuthorDate: Fri Feb 19 10:15:45 2021 -0800
HBASE-25583: Handle the running replication source gracefully with replication nodes deleted (#2960)
Signed-off-by: Wellington Chevreuil <wc...@apache.org>
Signed-off-by: Andrew Purtell <ap...@apache.org>
---
.../hbase/replication/ReplicationQueues.java | 2 +-
.../hbase/replication/ReplicationQueuesZKImpl.java | 50 ++++--
.../ReplicationSourceWithoutPeerException.java | 39 +++++
.../regionserver/ReplicationSource.java | 21 ++-
.../regionserver/ReplicationSourceManager.java | 38 +++--
.../hbase/replication/ReplicationSourceDummy.java | 1 +
.../ReplicationSourceDummyWithNoTermination.java | 26 +++
.../hbase/replication/TestReplicationSource.java | 80 ++++++++-
.../replication/TestReplicationStateBasic.java | 21 ++-
.../replication/TestReplicationStateZKImpl.java | 2 +-
.../regionserver/TestReplicationSourceBase.java | 150 +++++++++++++++++
.../regionserver/TestReplicationSourceManager.java | 180 ++++-----------------
...tReplicationSourceWithoutReplicationZnodes.java | 85 ++++++++++
.../regionserver/helper/DummyServer.java | 95 +++++++++++
14 files changed, 594 insertions(+), 196 deletions(-)
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
index ccc7172..05ae794 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueues.java
@@ -59,7 +59,7 @@ public interface ReplicationQueues {
* @param queueId a String that identifies the queue.
* @param filename name of the WAL
*/
- void removeLog(String queueId, String filename);
+ void removeLog(String queueId, String filename) throws ReplicationSourceWithoutPeerException;
/**
* Set the current position for a specific WAL in a given queue.
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
index 4121fa3..83e3680 100644
--- a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
@@ -25,19 +25,19 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
-import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.ZKUtil.ZKUtilOp;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
/**
@@ -132,17 +132,44 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
@Override
- public void removeLog(String queueId, String filename) {
+ public void removeLog(String queueId, String filename)
+ throws ReplicationSourceWithoutPeerException {
try {
- String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
- znode = ZKUtil.joinZNode(znode, filename);
- ZKUtil.deleteNode(this.zookeeper, znode);
- } catch (KeeperException e) {
- this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
- + filename + ")", e);
+ try {
+ String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+ znode = ZKUtil.joinZNode(znode, filename);
+ ZKUtil.deleteNode(this.zookeeper, znode);
+ } catch (KeeperException.NoNodeException e) {
+ // in case of no node exception we should not crash the region server
+ // but instead check if the replication peer has been removed.
+ // If so, we can throw here so that the source can terminate itself.
+ // This situation can occur when the replication peer znodes has been
+ // removed but the sources not terminated due to any miss from zk node delete watcher.
+ if (!doesPeerExist(queueId)) {
+ LOG.warn("Replication peer " + queueId + " has been removed", e);
+ throw new ReplicationSourceWithoutPeerException(
+ "Znodes for peer has been delete while a source is still active", e);
+ } else {
+ throw e;
+ }
+ }
+ } catch (KeeperException ke) {
+ this.abortable.abort(
+ "Failed to remove wal from queue (queueId=" + queueId + ", filename=" + filename + ")", ke);
}
}
+ private boolean doesPeerExist(String queueId) throws KeeperException {
+ String peerId = queueId;
+ if (peerId.contains("-")) {
+ // queueId will be in the form peerId + "-" + rsZNode.
+ // A peerId will not have "-" in its name, see HBASE-11394
+ peerId = queueId.split("-")[0];
+ }
+
+ return peerExists(peerId);
+ }
+
@Override
public void setLogPosition(String queueId, String filename, long position) {
try {
@@ -426,8 +453,9 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
// add delete op for peer
listOfOps.add(ZKUtilOp.deleteNodeFailSilent(oldClusterZnode));
- if (LOG.isTraceEnabled())
+ if (LOG.isTraceEnabled()) {
LOG.trace(" The multi list size is: " + listOfOps.size());
+ }
}
ZKUtil.multiOrSequential(this.zookeeper, listOfOps, false);
@@ -506,7 +534,7 @@ public class ReplicationQueuesZKImpl extends ReplicationStateZKBase implements R
}
/**
- * @param lockOwner
+ * @param lockOwner lock owner
* @return Serialized protobuf of <code>lockOwner</code> with pb magic prefix prepended suitable
* for use as content of an replication lock during region server fail over.
*/
diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java
new file mode 100644
index 0000000..a07d0c0
--- /dev/null
+++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This exception is thrown when the replication source is running with no
+ * corresponding peer. This can due to race condition between PeersWatcher
+ * zk listerner and source trying to remove the queues, or if zk listener for
+ * delete node was never invoked for any reason. See HBASE-25583
+ */
+@InterfaceAudience.Private
+public class ReplicationSourceWithoutPeerException extends ReplicationException {
+ private static final long serialVersionUID = 1L;
+
+ public ReplicationSourceWithoutPeerException(String m, Throwable t) {
+ super(m, t);
+ }
+
+ public ReplicationSourceWithoutPeerException(String m) {
+ super(m);
+ }
+}
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index f27c53d..7be880d 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -62,6 +62,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeer;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.SystemTableWALEntryFilter;
import org.apache.hadoop.hbase.replication.WALEntryFilter;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceWALReaderThread.WALEntryBatch;
@@ -158,7 +159,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
* @param clusterId unique UUID for the cluster
* @param replicationEndpoint the replication endpoint implementation
* @param metrics metrics for replication source
- * @throws IOException
+ * @throws IOException IO Exception
*/
@Override
public void init(final Configuration conf, final FileSystem fs,
@@ -441,7 +442,9 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
@Override
public Path getCurrentPath() {
for (ReplicationSourceShipperThread worker : workerThreads.values()) {
- if (worker.getCurrentPath() != null) return worker.getCurrentPath();
+ if (worker.getCurrentPath() != null) {
+ return worker.getCurrentPath();
+ }
}
return null;
}
@@ -460,7 +463,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
return 0;
}
- private boolean isSourceActive() {
+ public boolean isSourceActive() {
return !this.stopper.isStopped() && this.sourceRunning;
}
@@ -792,9 +795,13 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
}
private void updateLogPosition(long lastReadPosition) {
- manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
- this.replicationQueueInfo.isQueueRecovered(), false);
- lastLoggedPosition = lastReadPosition;
+ try {
+ manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
+ this.replicationQueueInfo.isQueueRecovered(), false);
+ lastLoggedPosition = lastReadPosition;
+ } catch (ReplicationSourceWithoutPeerException re) {
+ source.terminate("Replication peer is removed and source should terminate", re);
+ }
}
public void startup() {
@@ -976,7 +983,7 @@ public class ReplicationSource extends Thread implements ReplicationSourceInterf
/**
* Set the worker state
- * @param state
+ * @param state the state of the wal reader
*/
public void setWorkerState(WorkerState state) {
this.state = state;
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 859a2e4..d9435a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.replication.ReplicationPeerConfig;
import org.apache.hadoop.hbase.replication.ReplicationPeers;
import org.apache.hadoop.hbase.replication.ReplicationQueueInfo;
import org.apache.hadoop.hbase.replication.ReplicationQueues;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
import org.apache.hadoop.hbase.replication.ReplicationTracker;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.DefaultWALProvider;
@@ -125,14 +126,14 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Creates a replication manager and sets the watch on all the other registered region servers
* @param replicationQueues the interface for manipulating replication queues
- * @param replicationPeers
- * @param replicationTracker
+ * @param replicationPeers the replication peers maintenance class
+ * @param replicationTracker the replication tracker to track the states
* @param conf the configuration to use
* @param server the server for this region server
* @param fs the file system to use
* @param logDir the directory that contains all wal directories of live RSs
* @param oldLogDir the directory where old logs are archived
- * @param clusterId
+ * @param clusterId the cluster id of the source cluster
*/
public ReplicationSourceManager(final ReplicationQueues replicationQueues,
final ReplicationPeers replicationPeers, final ReplicationTracker replicationTracker,
@@ -181,14 +182,14 @@ public class ReplicationSourceManager implements ReplicationListener {
* wal it belongs to and will log, for this region server, the current
* position. It will also clean old logs from the queue.
* @param log Path to the log currently being replicated from
- * replication status in zookeeper. It will also delete older entries.
+ * replication status in zookeeper. It will also delete older entries.
* @param id id of the peer cluster
* @param position current location in the log
* @param queueRecovered indicates if this queue comes from another region server
* @param holdLogInZK if true then the log is retained in ZK
*/
public synchronized void logPositionAndCleanOldLogs(Path log, String id, long position,
- boolean queueRecovered, boolean holdLogInZK) {
+ boolean queueRecovered, boolean holdLogInZK) throws ReplicationSourceWithoutPeerException {
String fileName = log.getName();
this.replicationQueues.setLogPosition(id, fileName, position);
if (holdLogInZK) {
@@ -204,7 +205,8 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param id id of the peer cluster
* @param queueRecovered Whether this is a recovered queue
*/
- public void cleanOldLogs(String key, String id, boolean queueRecovered) {
+ public void cleanOldLogs(String key, String id, boolean queueRecovered)
+ throws ReplicationSourceWithoutPeerException {
String logPrefix = DefaultWALProvider.getWALPrefixFromWALName(key);
if (queueRecovered) {
Map<String, SortedSet<String>> walsForPeer = walsByIdRecoveredQueues.get(id);
@@ -222,9 +224,10 @@ public class ReplicationSourceManager implements ReplicationListener {
}
}
}
- }
+ }
- private void cleanOldLogs(SortedSet<String> wals, String key, String id) {
+ private void cleanOldLogs(SortedSet<String> wals, String key, String id)
+ throws ReplicationSourceWithoutPeerException {
SortedSet<String> walSet = wals.headSet(key);
LOG.debug("Removing " + walSet.size() + " logs in the list: " + walSet);
for (String wal : walSet) {
@@ -267,7 +270,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* need to enqueue the latest log of each wal group and do replication
* @param id the id of the peer cluster
* @return the source that was created
- * @throws IOException
+ * @throws IOException IO Exception
*/
protected ReplicationSourceInterface addSource(String id) throws IOException,
ReplicationException {
@@ -365,7 +368,7 @@ public class ReplicationSourceManager implements ReplicationListener {
/**
* Get the normal source for a given peer
- * @param peerId
+ * @param peerId the replication peer Id
* @return the normal source for the give peer if it exists, otherwise null.
*/
public ReplicationSourceInterface getSource(String peerId) {
@@ -402,7 +405,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* Check and enqueue the given log to the correct source. If there's still no source for the
* group to which the given log belongs, create one
* @param logPath the log path to check and enqueue
- * @throws IOException
+ * @throws IOException IO Exception
*/
private void recordLog(Path logPath) throws IOException {
String logName = logPath.getName();
@@ -467,7 +470,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* @param server the server object for this region server
* @param peerId the id of the peer cluster
* @return the created source
- * @throws IOException
+ * @throws IOException IO Exception
*/
protected ReplicationSourceInterface getReplicationSource(final Configuration conf,
final FileSystem fs, final ReplicationSourceManager manager,
@@ -523,7 +526,8 @@ public class ReplicationSourceManager implements ReplicationListener {
clusterId, replicationEndpoint, metrics);
// init replication endpoint
- replicationEndpoint.init(new ReplicationEndpoint.Context(conf, replicationPeer.getConfiguration(),
+ replicationEndpoint.init(new ReplicationEndpoint.Context(
+ conf, replicationPeer.getConfiguration(),
fs, peerId, clusterId, replicationPeer, metrics, tableDescriptors, server));
return src;
@@ -535,7 +539,7 @@ public class ReplicationSourceManager implements ReplicationListener {
* znodes and finally will delete the old znodes.
*
* It creates one old source for any type of source of the old rs.
- * @param rsZnode
+ * @param rsZnode znode for region server from where to transfer the queues
*/
private void transferQueues(String rsZnode) {
NodeFailoverWorker transfer =
@@ -664,7 +668,7 @@ public class ReplicationSourceManager implements ReplicationListener {
private final UUID clusterId;
/**
- * @param rsZnode
+ * @param rsZnode znode for dead region server
*/
public NodeFailoverWorker(String rsZnode) {
this(rsZnode, replicationQueues, replicationPeers, ReplicationSourceManager.this.clusterId);
@@ -820,7 +824,9 @@ public class ReplicationSourceManager implements ReplicationListener {
* Get the ReplicationPeers used by this ReplicationSourceManager
* @return the ReplicationPeers used by this ReplicationSourceManager
*/
- public ReplicationPeers getReplicationPeers() {return this.replicationPeers;}
+ public ReplicationPeers getReplicationPeers() {
+ return this.replicationPeers;
+ }
/**
* Get a string representation of all the sources' metrics
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
index a5c61f2..45704ae 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummy.java
@@ -44,6 +44,7 @@ public class ReplicationSourceDummy implements ReplicationSourceInterface {
String peerClusterId;
Path currentPath;
MetricsSource metrics;
+ public static final String fakeExceptionMessage = "Fake Exception";
@Override
public void init(Configuration conf, FileSystem fs, ReplicationSourceManager manager,
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
new file mode 100644
index 0000000..4a89917
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/ReplicationSourceDummyWithNoTermination.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+public class ReplicationSourceDummyWithNoTermination extends ReplicationSourceDummy {
+
+ @Override
+ public void terminate(String reason) {
+ // This is to block the zk listener to close the queues
+ // to simulate the znodes getting deleted without zk listener getting invoked
+ throw new RuntimeException(fakeExceptionMessage);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
index 84d2d8b..f85a52b 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationSource.java
@@ -28,6 +28,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -104,7 +105,7 @@ public class TestReplicationSource {
private static Configuration conf = TEST_UTIL.getConfiguration();
/**
- * @throws java.lang.Exception
+ * @throws java.lang.Exception exception
*/
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -112,9 +113,13 @@ public class TestReplicationSource {
FS = TEST_UTIL.getDFSCluster().getFileSystem();
Path rootDir = TEST_UTIL.createRootDir();
oldLogDir = new Path(rootDir, HConstants.HREGION_OLDLOGDIR_NAME);
- if (FS.exists(oldLogDir)) FS.delete(oldLogDir, true);
+ if (FS.exists(oldLogDir)) {
+ FS.delete(oldLogDir, true);
+ }
logDir = new Path(rootDir, HConstants.HREGION_LOGDIR_NAME);
- if (FS.exists(logDir)) FS.delete(logDir, true);
+ if (FS.exists(logDir)) {
+ FS.delete(logDir, true);
+ }
}
@Before
@@ -154,7 +159,7 @@ public class TestReplicationSource {
* Sanity check that we can move logs around while we are reading
* from them. Should this test fail, ReplicationSource would have a hard
* time reading logs that are being archived.
- * @throws Exception
+ * @throws Exception exception
*/
@Test
public void testLogMoving() throws Exception{
@@ -277,6 +282,14 @@ public class TestReplicationSource {
when(manager.getTotalBufferUsed()).thenReturn(totalBufferUsed);
}
+ // source manager throws the exception while cleaning logs
+ private void setReplicationSourceWithoutPeerException()
+ throws ReplicationSourceWithoutPeerException {
+ doThrow(new ReplicationSourceWithoutPeerException("No peer")).when(manager)
+ .logPositionAndCleanOldLogs(Mockito.<Path>anyObject(), Mockito.anyString(),
+ Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
+ }
+
ReplicationSource createReplicationSourceWithMocks(ReplicationEndpoint endpoint)
throws IOException {
final ReplicationSource source = new ReplicationSource();
@@ -471,6 +484,65 @@ public class TestReplicationSource {
}
/**
+ * There can be a scenario of replication peer removed but the replication source
+ * still running since termination of source depends upon zk listener and there
+ * can a rare scenario where zk listener might not get invoked or get delayed.
+ * In that case, replication source manager will throw since it won't be able
+ * to remove the znode while removing the log. We should terminate the source
+ * in that case. See HBASE-25583
+ * @throws Exception any exception
+ */
+ @Test
+ public void testReplicationSourceTerminationWhenNoZnodeForPeerAndQueues() throws Exception {
+ Mocks mocks = new Mocks();
+ mocks.setReplicationSourceWithoutPeerException();
+ // set table cfs to filter all cells out
+ final TableName replicatedTable = TableName.valueOf("replicated_table");
+ final Map<TableName, List<String>> cfs =
+ Collections.singletonMap(replicatedTable, Collections.<String>emptyList());
+ when(mocks.peer.getTableCFs()).thenReturn(cfs);
+
+ // Append 3 entries in a log
+ final Path log1 = new Path(logDir, "log.1");
+ WALProvider.Writer writer1 = WALFactory.createWALWriter(FS, log1, TEST_UTIL.getConfiguration());
+ appendEntries(writer1, 3);
+
+ // Replication end point with no filter
+ final ReplicationEndpointForTest endpoint = new ReplicationEndpointForTest() {
+ @Override
+ public WALEntryFilter getWALEntryfilter() {
+ return null;
+ }
+ };
+
+ final ReplicationSource source = mocks.createReplicationSourceWithMocks(endpoint);
+ source.run();
+ source.enqueueLog(log1);
+
+ // Wait for source to replicate
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.replicateCount.get() == 1;
+ }
+ });
+
+ // Wait for all the entries to get replicated
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ return endpoint.lastEntries.size() == 3;
+ }
+ });
+
+ // After that the source should be terminated
+ Waiter.waitFor(conf, 20000, new Waiter.Predicate<Exception>() {
+ @Override public boolean evaluate() {
+ // wait until reader read all cells
+ return !source.isSourceActive();
+ }
+ });
+ }
+
+ /**
* Tests that recovered queues are preserved on a regionserver shutdown.
* See HBASE-18192
*/
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
index c9cf6ab..f91c3a1 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateBasic.java
@@ -18,7 +18,11 @@
package org.apache.hadoop.hbase.replication;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import java.util.ArrayList;
import java.util.List;
@@ -53,7 +57,6 @@ public abstract class TestReplicationStateBasic {
protected static String KEY_TWO;
// For testing when we try to replicate to ourself
- protected String OUR_ID = "3";
protected String OUR_KEY;
protected static int zkTimeoutCount;
@@ -119,7 +122,6 @@ public abstract class TestReplicationStateBasic {
// 3 replicators should exist
assertEquals(3, rq1.getListOfReplicators().size());
rq1.removeQueue("bogus");
- rq1.removeLog("bogus", "bogus");
rq1.removeAllQueues();
assertNull(rq1.getAllQueues());
assertEquals(0, rq1.getLogPosition("bogus", "bogus"));
@@ -167,6 +169,19 @@ public abstract class TestReplicationStateBasic {
}
@Test
+ public void testLogRemovalWithNoZnode() throws ReplicationException {
+ rq1.init(server1);
+ Exception expectedException = null;
+ try {
+ rq1.removeLog("bogus", "bogus");
+ } catch (ReplicationException e) {
+ expectedException = e;
+ }
+
+ assertTrue(expectedException instanceof ReplicationSourceWithoutPeerException);
+ }
+
+ @Test
public void testInvalidClusterKeys() throws ReplicationException, KeeperException {
rp.init();
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
index d6bf4ea..f8877dc 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestReplicationStateZKImpl.java
@@ -31,10 +31,10 @@ import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
-import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
new file mode 100644
index 0000000..e94985e
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceBase.java
@@ -0,0 +1,150 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.rules.TestName;
+
+public abstract class TestReplicationSourceBase {
+
+ private static final Log LOG =
+ LogFactory.getLog(TestReplicationSourceBase.class);
+
+ protected static Configuration conf;
+ protected static HBaseTestingUtility utility;
+ protected static Replication replication;
+ protected static ReplicationSourceManager manager;
+ protected static ZooKeeperWatcher zkw;
+ protected static HTableDescriptor htd;
+ protected static HRegionInfo hri;
+
+ protected static final byte[] r1 = Bytes.toBytes("r1");
+ protected static final byte[] r2 = Bytes.toBytes("r2");
+ protected static final byte[] f1 = Bytes.toBytes("f1");
+ protected static final byte[] f2 = Bytes.toBytes("f2");
+ protected static final TableName test = TableName.valueOf("test");
+ protected static final String slaveId = "1";
+ protected static FileSystem fs;
+ protected static Path oldLogDir;
+ protected static Path logDir;
+ protected static DummyServer server;
+
+ @BeforeClass public static void setUpBeforeClass() throws Exception {
+
+ conf = HBaseConfiguration.create();
+ conf.set("replication.replicationsource.implementation",
+ ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+ conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY, HConstants.REPLICATION_ENABLE_DEFAULT);
+ conf.setLong("replication.sleep.before.failover", 2000);
+ conf.setInt("replication.source.maxretriesmultiplier", 10);
+ utility = new HBaseTestingUtility(conf);
+ utility.startMiniZKCluster();
+
+ zkw = new ZooKeeperWatcher(conf, "test", null);
+ ZKUtil.createWithParents(zkw, "/hbase/replication");
+ ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+ ZKUtil.setData(zkw, "/hbase/replication/peers/1", Bytes.toBytes(
+ conf.get(HConstants.ZOOKEEPER_QUORUM) + ":" + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT)
+ + ":/1"));
+ ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+ ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+ ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+ ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+ ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+ ZKClusterId.setClusterId(zkw, new ClusterId());
+ FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+ fs = FileSystem.get(conf);
+ oldLogDir = new Path(utility.getDataTestDir(), HConstants.HREGION_OLDLOGDIR_NAME);
+ logDir = new Path(utility.getDataTestDir(), HConstants.HREGION_LOGDIR_NAME);
+ server = new DummyServer(conf, "example.hostname.com", zkw);
+ replication = new Replication(server, fs, logDir, oldLogDir);
+ manager = replication.getReplicationManager();
+
+ manager.addSource(slaveId);
+
+ htd = new HTableDescriptor(test);
+ HColumnDescriptor col = new HColumnDescriptor(f1);
+ col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+ htd.addFamily(col);
+ col = new HColumnDescriptor(f2);
+ col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+ htd.addFamily(col);
+
+ hri = new HRegionInfo(htd.getTableName(), r1, r2);
+ }
+
+ @AfterClass public static void tearDownAfterClass() throws Exception {
+ try {
+ manager.join();
+ } catch (RuntimeException re) {
+ if (re.getMessage().equals(fakeExceptionMessage)) {
+ LOG.info("It is fine");
+ }
+ }
+ utility.shutdownMiniCluster();
+ }
+
+ @Rule public TestName testName = new TestName();
+
+ private void cleanLogDir() throws IOException {
+ fs.delete(logDir, true);
+ fs.delete(oldLogDir, true);
+ }
+
+ @Before public void setUp() throws Exception {
+ LOG.info("Start " + testName.getMethodName());
+ cleanLogDir();
+ }
+
+ @After public void tearDown() throws Exception {
+ LOG.info("End " + testName.getMethodName());
+ cleanLogDir();
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
index 297bc09..f0c18d3 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceManager.java
@@ -47,9 +47,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ChoreService;
import org.apache.hadoop.hbase.ClusterId;
-import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -58,11 +56,9 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.Server;
-import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Stoppable;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.Waiter;
-import org.apache.hadoop.hbase.client.ClusterConnection;
import org.apache.hadoop.hbase.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.protobuf.generated.WALProtos.BulkLoadDescriptor;
import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
@@ -78,6 +74,7 @@ import org.apache.hadoop.hbase.replication.ReplicationQueuesClient;
import org.apache.hadoop.hbase.replication.ReplicationSourceDummy;
import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
import org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.NodeFailoverWorker;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.util.ByteStringer;
import org.apache.hadoop.hbase.util.Bytes;
@@ -86,61 +83,25 @@ import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.wal.WAL;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hbase.wal.WALKey;
-import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
-import org.junit.After;
+
import org.junit.AfterClass;
-import org.junit.Before;
import org.junit.BeforeClass;
-import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
-import org.junit.rules.TestName;
@Category(MediumTests.class)
-public class TestReplicationSourceManager {
+public class TestReplicationSourceManager extends TestReplicationSourceBase {
private static final Log LOG =
- LogFactory.getLog(TestReplicationSourceManager.class);
-
- private static Configuration conf;
-
- private static HBaseTestingUtility utility;
-
- private static Replication replication;
-
- private static ReplicationSourceManager manager;
-
- private static ZooKeeperWatcher zkw;
-
- private static HTableDescriptor htd;
-
- private static HRegionInfo hri;
-
- private static final byte[] r1 = Bytes.toBytes("r1");
-
- private static final byte[] r2 = Bytes.toBytes("r2");
-
- private static final byte[] f1 = Bytes.toBytes("f1");
-
- private static final byte[] f2 = Bytes.toBytes("f2");
-
+ LogFactory.getLog(TestReplicationSourceManager.class);
private static final TableName test =
TableName.valueOf("test");
-
private static final String slaveId = "1";
-
- private static FileSystem fs;
-
- private static Path oldLogDir;
-
- private static Path logDir;
-
private static CountDownLatch latch;
-
- private static List<String> files = new ArrayList<String>();
+ private static List<String> files = new ArrayList<>();
@BeforeClass
public static void setUpBeforeClass() throws Exception {
@@ -174,7 +135,8 @@ public class TestReplicationSourceManager {
HConstants.HREGION_OLDLOGDIR_NAME);
logDir = new Path(utility.getDataTestDir(),
HConstants.HREGION_LOGDIR_NAME);
- replication = new Replication(new DummyServer(), fs, logDir, oldLogDir);
+ server = new DummyServer(conf, "example.hostname.com", zkw);
+ replication = new Replication(server, fs, logDir, oldLogDir);
manager = replication.getReplicationManager();
manager.addSource(slaveId);
@@ -196,26 +158,6 @@ public class TestReplicationSourceManager {
utility.shutdownMiniCluster();
}
- @Rule
- public TestName testName = new TestName();
-
- private void cleanLogDir() throws IOException {
- fs.delete(logDir, true);
- fs.delete(oldLogDir, true);
- }
-
- @Before
- public void setUp() throws Exception {
- LOG.info("Start " + testName.getMethodName());
- cleanLogDir();
- }
-
- @After
- public void tearDown() throws Exception {
- LOG.info("End " + testName.getMethodName());
- cleanLogDir();
- }
-
@Test
public void testLogRoll() throws Exception {
long baseline = 1000;
@@ -288,7 +230,7 @@ public class TestReplicationSourceManager {
@Test
public void testClaimQueues() throws Exception {
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
- final Server server = new DummyServer("hostname0.example.org");
+ final Server server = new DummyServer(conf, "hostname0.example.org", zkw);
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
@@ -300,9 +242,9 @@ public class TestReplicationSourceManager {
rq.addLog("1", file);
}
// create 3 DummyServers
- Server s1 = new DummyServer("dummyserver1.example.org");
- Server s2 = new DummyServer("dummyserver2.example.org");
- Server s3 = new DummyServer("dummyserver3.example.org");
+ Server s1 = new DummyServer(conf, "dummyserver1.example.org", zkw);
+ Server s2 = new DummyServer(conf, "dummyserver2.example.org", zkw);
+ Server s3 = new DummyServer(conf, "dummyserver3.example.org", zkw);
// create 3 DummyNodeFailoverWorkers
DummyNodeFailoverWorker w1 = new DummyNodeFailoverWorker(
@@ -329,7 +271,7 @@ public class TestReplicationSourceManager {
@Test
public void testCleanupFailoverQueues() throws Exception {
- final Server server = new DummyServer("hostname1.example.org");
+ final Server server = new DummyServer(conf, "hostname1.example.org", zkw);
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
@@ -344,7 +286,7 @@ public class TestReplicationSourceManager {
for (String file : files) {
rq.addLog("1", file);
}
- Server s1 = new DummyServer("dummyserver1.example.org");
+ Server s1 = new DummyServer(conf, "dummyserver1.example.org", zkw);
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
rq1.init(s1.getServerName().toString());
@@ -368,7 +310,7 @@ public class TestReplicationSourceManager {
public void testNodeFailoverDeadServerParsing() throws Exception {
LOG.debug("testNodeFailoverDeadServerParsing");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
- final Server server = new DummyServer("ec2-54-234-230-108.compute-1.amazonaws.com");
+ final Server server = new DummyServer(conf, "ec2-54-234-230-108.compute-1.amazonaws.com", zkw);
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), conf, server);
repQueues.init(server.getServerName().toString());
@@ -380,9 +322,9 @@ public class TestReplicationSourceManager {
}
// create 3 DummyServers
- Server s1 = new DummyServer("ip-10-8-101-114.ec2.internal");
- Server s2 = new DummyServer("ec2-107-20-52-47.compute-1.amazonaws.com");
- Server s3 = new DummyServer("ec2-23-20-187-167.compute-1.amazonaws.com");
+ Server s1 = new DummyServer(conf, "ip-10-8-101-114.ec2.internal", zkw);
+ Server s2 = new DummyServer(conf, "ec2-107-20-52-47.compute-1.amazonaws.com", zkw);
+ Server s3 = new DummyServer(conf, "ec2-23-20-187-167.compute-1.amazonaws.com", zkw);
// simulate three servers fail sequentially
ReplicationQueues rq1 =
@@ -423,7 +365,7 @@ public class TestReplicationSourceManager {
LOG.debug("testFailoverDeadServerCversionChange");
conf.setBoolean(HConstants.ZOOKEEPER_USEMULTI, true);
- final Server s0 = new DummyServer("cversion-change0.example.org");
+ final Server s0 = new DummyServer(conf, "cversion-change0.example.org", zkw);
ReplicationQueues repQueues =
ReplicationFactory.getReplicationQueues(s0.getZooKeeper(), conf, s0);
repQueues.init(s0.getServerName().toString());
@@ -434,7 +376,7 @@ public class TestReplicationSourceManager {
repQueues.addLog("1", file);
}
// simulate queue transfer
- Server s1 = new DummyServer("cversion-change1.example.org");
+ Server s1 = new DummyServer(conf, "cversion-change1.example.org", zkw);
ReplicationQueues rq1 =
ReplicationFactory.getReplicationQueues(s1.getZooKeeper(), s1.getConfiguration(), s1);
rq1.init(s1.getServerName().toString());
@@ -459,7 +401,7 @@ public class TestReplicationSourceManager {
@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="RU_INVOKE_RUN",
justification="Intended")
public void testCleanupUnknownPeerZNode() throws Exception {
- final Server server = new DummyServer("hostname2.example.org");
+ final Server server = new DummyServer(conf, "hostname2.example.org", zkw);
ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
@@ -521,16 +463,15 @@ public class TestReplicationSourceManager {
* corresponding ReplicationSourceInterface correctly cleans up the corresponding
* replication queue and ReplicationPeer.
* See HBASE-16096.
- * @throws Exception
+ * @throws Exception exception
*/
@Test
- public void testPeerRemovalCleanup() throws Exception{
+ public void testPeerRemovalCleanup() throws Exception {
String replicationSourceImplName = conf.get("replication.replicationsource.implementation");
final String peerId = "FakePeer";
final ReplicationPeerConfig peerConfig =
new ReplicationPeerConfig().setClusterKey("localhost:1:/hbase");
try {
- DummyServer server = new DummyServer();
final ReplicationQueues rq =
ReplicationFactory.getReplicationQueues(server.getZooKeeper(), server.getConfiguration(),
server);
@@ -600,10 +541,10 @@ public class TestReplicationSourceManager {
/**
* Add a peer and wait for it to initialize
- * @param peerId
- * @param peerConfig
+ * @param peerId the replication peer Id
+ * @param peerConfig the replication peer config
* @param waitForSource Whether to wait for replication source to initialize
- * @throws Exception
+ * @throws Exception exception
*/
private void addPeerAndWait(final String peerId, final ReplicationPeerConfig peerConfig,
final boolean waitForSource) throws Exception {
@@ -622,8 +563,8 @@ public class TestReplicationSourceManager {
/**
* Remove a peer and wait for it to get cleaned up
- * @param peerId
- * @throws Exception
+ * @param peerId the replication peer Id
+ * @throws Exception exception
*/
private void removePeerAndWait(final String peerId) throws Exception {
final ReplicationPeers rp = manager.getReplicationPeers();
@@ -639,7 +580,6 @@ public class TestReplicationSourceManager {
});
}
-
private WALEdit getBulkLoadWALEdit() {
// 1. Create store files for the families
Map<byte[], List<Path>> storeFiles = new HashMap<>(1);
@@ -742,70 +682,4 @@ public class TestReplicationSourceManager {
throw new IOException("Failing deliberately");
}
}
-
- static class DummyServer implements Server {
- String hostname;
-
- DummyServer() {
- hostname = "hostname.example.org";
- }
-
- DummyServer(String hostname) {
- this.hostname = hostname;
- }
-
- @Override
- public Configuration getConfiguration() {
- return conf;
- }
-
- @Override
- public ZooKeeperWatcher getZooKeeper() {
- return zkw;
- }
-
- @Override
- public CoordinatedStateManager getCoordinatedStateManager() {
- return null;
- }
- @Override
- public ClusterConnection getConnection() {
- return null;
- }
-
- @Override
- public MetaTableLocator getMetaTableLocator() {
- return null;
- }
-
- @Override
- public ServerName getServerName() {
- return ServerName.valueOf(hostname, 1234, 1L);
- }
-
- @Override
- public void abort(String why, Throwable e) {
- // To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public boolean isAborted() {
- return false;
- }
-
- @Override
- public void stop(String why) {
- // To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public boolean isStopped() {
- return false; // To change body of implemented methods use File | Settings | File Templates.
- }
-
- @Override
- public ChoreService getChoreService() {
- return null;
- }
- }
}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
new file mode 100644
index 0000000..095710d
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
@@ -0,0 +1,85 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes extends TestReplicationSourceBase {
+
+ /**
+ * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+ * which terminates the replication sources. In case of zk watcher not getting invoked
+ * or a race condition between source deleting the log znode and zk watcher
+ * terminating the source, we might get the NoNode exception. In that case, the right
+ * thing is to terminate the replication source.
+ * @throws Exception throws exception
+ */
+ @Test
+ public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+ MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+ KeyValue kv = new KeyValue(r1, f1, r1);
+ WALEdit edit = new WALEdit();
+ edit.add(kv);
+
+ List<WALActionsListener> listeners = new ArrayList<>();
+ listeners.add(replication);
+ final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+ URLEncoder.encode("regionserver:60020", "UTF8"));
+ final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+ manager.init();
+
+ final long txid = wal.append(htd, hri,
+ new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+ edit, true);
+ wal.sync(txid);
+
+ wal.rollWriter();
+ ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+ ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+ ReplicationException exceptionThrown = null;
+ try {
+ manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(),
+ "1", 0, false, false);
+ } catch (ReplicationException e) {
+ exceptionThrown = e;
+ }
+
+ Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);
+ }
+}
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/helper/DummyServer.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/helper/DummyServer.java
new file mode 100644
index 0000000..59cf162
--- /dev/null
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/helper/DummyServer.java
@@ -0,0 +1,95 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.replication.regionserver.helper;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.ChoreService;
+import org.apache.hadoop.hbase.CoordinatedStateManager;
+import org.apache.hadoop.hbase.Server;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.client.ClusterConnection;
+import org.apache.hadoop.hbase.zookeeper.MetaTableLocator;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+
+public class DummyServer implements Server {
+ Configuration conf;
+ String hostname;
+ ZooKeeperWatcher zkw;
+
+ public DummyServer(Configuration conf, String hostname, ZooKeeperWatcher zkw) {
+ this.conf = conf;
+ this.hostname = hostname;
+ this.zkw = zkw;
+ }
+
+ @Override
+ public Configuration getConfiguration() {
+ return conf;
+ }
+
+ @Override
+ public ZooKeeperWatcher getZooKeeper() {
+ return zkw;
+ }
+
+ @Override
+ public CoordinatedStateManager getCoordinatedStateManager() {
+ return null;
+ }
+ @Override
+ public ClusterConnection getConnection() {
+ return null;
+ }
+
+ @Override
+ public MetaTableLocator getMetaTableLocator() {
+ return null;
+ }
+
+ @Override
+ public ServerName getServerName() {
+ return ServerName.valueOf(hostname, 1234, 1L);
+ }
+
+ @Override
+ public void abort(String why, Throwable e) {
+ // To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public boolean isAborted() {
+ return false;
+ }
+
+ @Override
+ public void stop(String why) {
+ // To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public boolean isStopped() {
+ return false; // To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ @Override
+ public ChoreService getChoreService() {
+ return null;
+ }
+}