You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/12/21 02:39:44 UTC

[4/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f276edfa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f276edfa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f276edfa

Branch: refs/heads/branch-1.2
Commit: f276edfadbc5b598bdd7412809e76e8077207603
Parents: b882263
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:33:54 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |   2 +
 .../regionserver/ReplicationSourceManager.java  |  14 +++
 .../replication/TestMasterReplication.java      | 105 +++++++++++++++++--
 3 files changed, 114 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index be16d01..6c93dcd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -279,6 +279,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers

http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index de65f02..603557b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -529,6 +529,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0932bf2..46b37d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,24 +19,33 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -51,11 +60,15 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -158,12 +171,49 @@ public class TestMasterReplication {
   }
 
   /**
-   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
-   * deleting rows to a table in each clusters and ensuring that the each of
-   * these clusters get the appropriate mutations. It also tests the grouping
-   * scenario where a cluster needs to replicate the edits originating from
-   * itself and also the edits that it received using replication from a
-   * different cluster. The scenario is explained in HBASE-9158
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
+/**
+  * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
+  * deleting rows to a table in each clusters and ensuring that the each of
+  * these clusters get the appropriate mutations. It also tests the grouping
+  * scenario where a cluster needs to replicate the edits originating from
+  * itself and also the edits that it received using replication from a
+  * different cluster. The scenario is explained in HBASE-9158
    */
   @Test(timeout = 300000)
   public void testCyclicReplication2() throws Exception {
@@ -438,6 +488,47 @@ public class TestMasterReplication {
     }
   }
 
+  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+      final byte[] row) throws IOException {
+    final Admin admin = utility.getHBaseAdmin();
+    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+    // find the region that corresponds to the given row.
+    HRegion region = null;
+    for (HRegion candidate : cluster.getRegions(table)) {
+      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+        region = candidate;
+        break;
+      }
+    }
+    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // listen for successful log rolls
+    final WALActionsListener listener = new WALActionsListener.Base() {
+          @Override
+          public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+            latch.countDown();
+          }
+        };
+    region.getWAL().registerWALActionsListener(listener);
+
+    // request a roll
+    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+      region.getRegionInfo().getRegionName()));
+
+    // wait
+    try {
+      latch.await();
+    } catch (InterruptedException exception) {
+      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
+          "replication tests fail, it's probably because we should still be waiting.");
+      Thread.currentThread().interrupt();
+    }
+    region.getWAL().unregisterWALActionsListener(listener);
+  }
+
   /**
    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
    * timestamp there is otherwise no way to count them.