You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/12/21 02:39:44 UTC
[4/6] hbase git commit: HBASE-17328 Properly dispose of looped
replication peers
HBASE-17328 Properly dispose of looped replication peers
Signed-off-by: Andrew Purtell <ap...@apache.org>
Conflicts:
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f276edfa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f276edfa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f276edfa
Branch: refs/heads/branch-1.2
Commit: f276edfadbc5b598bdd7412809e76e8077207603
Parents: b882263
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:33:54 2016 -0800
----------------------------------------------------------------------
.../regionserver/ReplicationSource.java | 2 +
.../regionserver/ReplicationSourceManager.java | 14 +++
.../replication/TestMasterReplication.java | 105 +++++++++++++++++--
3 files changed, 114 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index be16d01..6c93dcd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -279,6 +279,8 @@ public class ReplicationSource extends Thread
this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
+ peerClusterId + " which is not allowed by ReplicationEndpoint:"
+ replicationEndpoint.getClass().getName(), null, false);
+ this.manager.closeQueue(this);
+ return;
}
LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
// start workers
http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index de65f02..603557b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -529,6 +529,20 @@ public class ReplicationSourceManager implements ReplicationListener {
}
/**
+ * Clear the references to the specified old source
+ * @param src source to clear
+ */
+ public void closeQueue(ReplicationSourceInterface src) {
+ LOG.info("Done with the queue " + src.getPeerClusterZnode());
+ if (src instanceof ReplicationSource) {
+ ((ReplicationSource) src).getSourceMetrics().clear();
+ }
+ this.sources.remove(src);
+ deleteSource(src.getPeerClusterZnode(), true);
+ this.walsById.remove(src.getPeerClusterZnode());
+ }
+
+ /**
* Thie method first deletes all the recovered sources for the specified
* id, then deletes the normal source (deleting all related data in ZK).
* @param id The id of the peer cluster
http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0932bf2..46b37d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,24 +19,33 @@ package org.apache.hadoop.hbase.replication;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.Closeable;
import java.io.IOException;
+import java.util.Arrays;
import java.util.List;
import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability;
@@ -51,11 +60,15 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.coprocessor.ObserverContext;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.After;
import org.junit.Before;
@@ -158,12 +171,49 @@ public class TestMasterReplication {
}
/**
- * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
- * deleting rows to a table in each clusters and ensuring that the each of
- * these clusters get the appropriate mutations. It also tests the grouping
- * scenario where a cluster needs to replicate the edits originating from
- * itself and also the edits that it received using replication from a
- * different cluster. The scenario is explained in HBASE-9158
+ * Tests the replication scenario 0 -> 0. By default
+ * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+ * ReplicationSource should terminate, and no further logs should get enqueued
+ */
+ @Test(timeout = 300000)
+ public void testLoopedReplication() throws Exception {
+ LOG.info("testLoopedReplication");
+ startMiniClusters(1);
+ createTableOnClusters(table);
+ addPeer("1", 0, 0);
+ Thread.sleep(SLEEP_TIME);
+
+ // wait for source to terminate
+ final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+ Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+ @Override
+ public boolean evaluate() throws Exception {
+ ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+ ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+ List<ReplicationLoadSource> replicationLoadSourceList =
+ serverLoad.getReplicationLoadSourceList();
+ return replicationLoadSourceList.size() == 0;
+ }
+ });
+
+ Table[] htables = getHTablesOnClusters(tableName);
+ putAndWait(row, famName, htables[0], htables[0]);
+ rollWALAndWait(utilities[0], table.getTableName(), row);
+ ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+ String queuesZnode =
+ ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+ List<String> listChildrenNoWatch =
+ ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+ assertEquals(0, listChildrenNoWatch.size());
+ }
+
+/**
+ * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
+ * deleting rows to a table in each clusters and ensuring that the each of
+ * these clusters get the appropriate mutations. It also tests the grouping
+ * scenario where a cluster needs to replicate the edits originating from
+ * itself and also the edits that it received using replication from a
+ * different cluster. The scenario is explained in HBASE-9158
*/
@Test(timeout = 300000)
public void testCyclicReplication2() throws Exception {
@@ -438,6 +488,47 @@ public class TestMasterReplication {
}
}
+ private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+ final byte[] row) throws IOException {
+ final Admin admin = utility.getHBaseAdmin();
+ final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+ // find the region that corresponds to the given row.
+ HRegion region = null;
+ for (HRegion candidate : cluster.getRegions(table)) {
+ if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+ region = candidate;
+ break;
+ }
+ }
+ assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ // listen for successful log rolls
+ final WALActionsListener listener = new WALActionsListener.Base() {
+ @Override
+ public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+ latch.countDown();
+ }
+ };
+ region.getWAL().registerWALActionsListener(listener);
+
+ // request a roll
+ admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+ region.getRegionInfo().getRegionName()));
+
+ // wait
+ try {
+ latch.await();
+ } catch (InterruptedException exception) {
+ LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
+ "replication tests fail, it's probably because we should still be waiting.");
+ Thread.currentThread().interrupt();
+ }
+ region.getWAL().unregisterWALActionsListener(listener);
+ }
+
/**
* Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
* timestamp there is otherwise no way to count them.