You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/12/21 02:39:45 UTC

[5/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d3ffdf6e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d3ffdf6e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d3ffdf6e

Branch: refs/heads/branch-1.1
Commit: d3ffdf6e90a16cf250375469b2f04717b942ad94
Parents: aa47da8
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 17:20:14 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:34:57 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  2 +
 .../regionserver/ReplicationSourceManager.java  | 14 +++
 .../replication/TestMasterReplication.java      | 91 ++++++++++++++++++++
 3 files changed, 107 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d3ffdf6e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3049d9b..f7dd446 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -273,6 +273,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d3ffdf6e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9062b87..81f06a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -468,6 +468,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/d3ffdf6e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0932bf2..b373886 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,24 +19,34 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -51,11 +61,14 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -273,6 +286,43 @@ public class TestMasterReplication {
     }
   }
 
+  /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
   @After
   public void tearDown() throws IOException {
     configurations = null;
@@ -438,6 +488,47 @@ public class TestMasterReplication {
     }
   }
 
+  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+      final byte[] row) throws IOException {
+    final Admin admin = utility.getHBaseAdmin();
+    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+    // find the region that corresponds to the given row.
+    HRegion region = null;
+    for (HRegion candidate : cluster.getRegions(table)) {
+      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+        region = candidate;
+        break;
+      }
+    }
+    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // listen for successful log rolls
+    final WALActionsListener listener = new WALActionsListener.Base() {
+      @Override
+      public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+        latch.countDown();
+      }
+    };
+    region.getWAL().registerWALActionsListener(listener);
+
+    // request a roll
+    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+      region.getRegionInfo().getRegionName()));
+
+    // wait
+    try {
+      latch.await();
+    } catch (InterruptedException exception) {
+      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later "
+          + "replication tests fail, it's probably because we should still be waiting.");
+      Thread.currentThread().interrupt();
+    }
+    region.getWAL().unregisterWALActionsListener(listener);
+  }
+
   /**
    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
    * timestamp there is otherwise no way to count them.