You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@hbase.apache.org by ap...@apache.org on 2016/12/21 02:39:41 UTC

[1/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

Repository: hbase
Updated Branches:
  refs/heads/0.98 f63b5a0db -> 5ea953b01
  refs/heads/branch-1 33002bd8e -> e79afbf0c
  refs/heads/branch-1.1 aa47da890 -> d3ffdf6e9
  refs/heads/branch-1.2 b8822633b -> f276edfad
  refs/heads/branch-1.3 c43d759d9 -> 7b3187c1a
  refs/heads/master 06b67a632 -> f8474c8d4


HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/e79afbf0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/e79afbf0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/e79afbf0

Branch: refs/heads/branch-1
Commit: e79afbf0cbca95ed4dad67ef83d9755c86629a85
Parents: 33002bd
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:01:33 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  2 +
 .../regionserver/ReplicationSourceManager.java  | 14 ++++++
 .../replication/TestMasterReplication.java      | 48 ++++++++++++++++++--
 3 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/e79afbf0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 63549d0..81b39a0 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -327,6 +327,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79afbf0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b31cc54..cdc6fce 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -550,6 +550,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/e79afbf0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 2a1ef6a..a8af946 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -42,8 +43,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -65,9 +68,11 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -171,6 +176,43 @@ public class TestMasterReplication {
   }
 
   /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
+  /**
    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
    * HFiles to a table in each cluster, checking if it's replicated.
    */
@@ -334,7 +376,7 @@ public class TestMasterReplication {
       shutDownMiniClusters();
     }
   }
-  
+
   /**
    * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
    * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
@@ -515,7 +557,7 @@ public class TestMasterReplication {
       close(replicationAdmin);
     }
   }
-  
+
   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
       throws Exception {
     ReplicationAdmin replicationAdmin = null;


[2/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

Posted by ap...@apache.org.
HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f8474c8d
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f8474c8d
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f8474c8d

Branch: refs/heads/master
Commit: f8474c8d4d3e722aa0129e085f6a5287c5e2be89
Parents: 06b67a6
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:03:11 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  2 +
 .../regionserver/ReplicationSourceManager.java  | 14 ++++++
 .../replication/TestMasterReplication.java      | 46 +++++++++++++++++++-
 3 files changed, 60 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f8474c8d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index a6fe0fb..c988f87 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -327,6 +327,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8474c8d/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index fa6f894..2c9fdcc 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -539,6 +539,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/f8474c8d/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 5b8538b..7ac5e94 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -42,7 +43,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.ConnectionFactory;
 import org.apache.hadoop.hbase.client.Delete;
@@ -69,6 +73,7 @@ import org.apache.hadoop.hbase.testclassification.ReplicationTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -169,6 +174,43 @@ public class TestMasterReplication {
   }
 
   /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.getZNodePaths().baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
+  /**
    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
    * HFiles to a table in each cluster, checking if it's replicated.
    */
@@ -332,7 +374,7 @@ public class TestMasterReplication {
       shutDownMiniClusters();
     }
   }
-  
+
   /**
    * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
    * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
@@ -483,7 +525,7 @@ public class TestMasterReplication {
       close(replicationAdmin);
     }
   }
-  
+
   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
       throws Exception {
     ReplicationAdmin replicationAdmin = null;


[4/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

Posted by ap...@apache.org.
HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/f276edfa
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/f276edfa
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/f276edfa

Branch: refs/heads/branch-1.2
Commit: f276edfadbc5b598bdd7412809e76e8077207603
Parents: b882263
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:33:54 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |   2 +
 .../regionserver/ReplicationSourceManager.java  |  14 +++
 .../replication/TestMasterReplication.java      | 105 +++++++++++++++++--
 3 files changed, 114 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index be16d01..6c93dcd 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -279,6 +279,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers

http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index de65f02..603557b 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -529,6 +529,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/f276edfa/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0932bf2..46b37d2 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,24 +19,33 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -51,11 +60,15 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -158,12 +171,49 @@ public class TestMasterReplication {
   }
 
   /**
-   * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
-   * deleting rows to a table in each clusters and ensuring that the each of
-   * these clusters get the appropriate mutations. It also tests the grouping
-   * scenario where a cluster needs to replicate the edits originating from
-   * itself and also the edits that it received using replication from a
-   * different cluster. The scenario is explained in HBASE-9158
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
+/**
+  * Tests the cyclic replication scenario of 0 -> 1 -> 2 -> 0 by adding and
+  * deleting rows to a table in each clusters and ensuring that the each of
+  * these clusters get the appropriate mutations. It also tests the grouping
+  * scenario where a cluster needs to replicate the edits originating from
+  * itself and also the edits that it received using replication from a
+  * different cluster. The scenario is explained in HBASE-9158
    */
   @Test(timeout = 300000)
   public void testCyclicReplication2() throws Exception {
@@ -438,6 +488,47 @@ public class TestMasterReplication {
     }
   }
 
+  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+      final byte[] row) throws IOException {
+    final Admin admin = utility.getHBaseAdmin();
+    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+    // find the region that corresponds to the given row.
+    HRegion region = null;
+    for (HRegion candidate : cluster.getRegions(table)) {
+      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+        region = candidate;
+        break;
+      }
+    }
+    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // listen for successful log rolls
+    final WALActionsListener listener = new WALActionsListener.Base() {
+          @Override
+          public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+            latch.countDown();
+          }
+        };
+    region.getWAL().registerWALActionsListener(listener);
+
+    // request a roll
+    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+      region.getRegionInfo().getRegionName()));
+
+    // wait
+    try {
+      latch.await();
+    } catch (InterruptedException exception) {
+      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later " +
+          "replication tests fail, it's probably because we should still be waiting.");
+      Thread.currentThread().interrupt();
+    }
+    region.getWAL().unregisterWALActionsListener(listener);
+  }
+
   /**
    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
    * timestamp there is otherwise no way to count them.


[5/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

Posted by ap...@apache.org.
HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/d3ffdf6e
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/d3ffdf6e
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/d3ffdf6e

Branch: refs/heads/branch-1.1
Commit: d3ffdf6e90a16cf250375469b2f04717b942ad94
Parents: aa47da8
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 17:20:14 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:34:57 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  2 +
 .../regionserver/ReplicationSourceManager.java  | 14 +++
 .../replication/TestMasterReplication.java      | 91 ++++++++++++++++++++
 3 files changed, 107 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/d3ffdf6e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 3049d9b..f7dd446 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -273,6 +273,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/d3ffdf6e/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 9062b87..81f06a3 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -468,6 +468,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/d3ffdf6e/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index 0932bf2..b373886 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,24 +19,34 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -51,11 +61,14 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
 import org.apache.hadoop.hbase.regionserver.RSRpcServices;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -273,6 +286,43 @@ public class TestMasterReplication {
     }
   }
 
+  /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
   @After
   public void tearDown() throws IOException {
     configurations = null;
@@ -438,6 +488,47 @@ public class TestMasterReplication {
     }
   }
 
+  private void rollWALAndWait(final HBaseTestingUtility utility, final TableName table,
+      final byte[] row) throws IOException {
+    final Admin admin = utility.getHBaseAdmin();
+    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+    // find the region that corresponds to the given row.
+    HRegion region = null;
+    for (HRegion candidate : cluster.getRegions(table)) {
+      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+        region = candidate;
+        break;
+      }
+    }
+    assertNotNull("Couldn't find the region for row '" + Arrays.toString(row) + "'", region);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // listen for successful log rolls
+    final WALActionsListener listener = new WALActionsListener.Base() {
+      @Override
+      public void postLogRoll(final Path oldPath, final Path newPath) throws IOException {
+        latch.countDown();
+      }
+    };
+    region.getWAL().registerWALActionsListener(listener);
+
+    // request a roll
+    admin.rollWALWriter(cluster.getServerHoldingRegion(region.getTableDesc().getTableName(),
+      region.getRegionInfo().getRegionName()));
+
+    // wait
+    try {
+      latch.await();
+    } catch (InterruptedException exception) {
+      LOG.warn("Interrupted while waiting for the wal of '" + region + "' to roll. If later "
+          + "replication tests fail, it's probably because we should still be waiting.");
+      Thread.currentThread().interrupt();
+    }
+    region.getWAL().unregisterWALActionsListener(listener);
+  }
+
   /**
    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
    * timestamp there is otherwise no way to count them.


[3/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

Posted by ap...@apache.org.
HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>

Conflicts:
	hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/7b3187c1
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/7b3187c1
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/7b3187c1

Branch: refs/heads/branch-1.3
Commit: 7b3187c1a02eb875b2ba2daa49d43738f4dce8f8
Parents: c43d759
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 16:29:40 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:03:55 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |  2 +
 .../regionserver/ReplicationSourceManager.java  | 14 ++++++
 .../replication/TestMasterReplication.java      | 48 ++++++++++++++++++--
 3 files changed, 61 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/7b3187c1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 17bf6e5..7f902c4 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -308,6 +308,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating " + clusterId + " -> " + peerClusterId);
     // start workers

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b3187c1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index 996c518..c4dc800 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -536,6 +536,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.walsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/7b3187c1/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index ffa3e42..cf29186 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
@@ -42,8 +43,10 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.MiniHBaseCluster;
-import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Admin;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
@@ -65,9 +68,11 @@ import org.apache.hadoop.hbase.regionserver.RSRpcServices;
 import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.replication.regionserver.TestSourceFSConfigurationProvider;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.HFileTestUtil;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -171,6 +176,43 @@ public class TestMasterReplication {
   }
 
   /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false, so the
+   * ReplicationSource should terminate, and no further logs should get enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName = utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus = utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    Table[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch =
+        ZKUtil.listChildrenNoWatch(zkw, ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
+  /**
    * It tests the replication scenario involving 0 -> 1 -> 0. It does it by bulk loading a set of
    * HFiles to a table in each cluster, checking if it's replicated.
    */
@@ -334,7 +376,7 @@ public class TestMasterReplication {
       shutDownMiniClusters();
     }
   }
-  
+
   /**
    * It tests the bulk loaded hfile replication scenario to only explicitly specified table column
    * families. It does it by bulk loading a set of HFiles belonging to both the CFs of table and set
@@ -514,7 +556,7 @@ public class TestMasterReplication {
       close(replicationAdmin);
     }
   }
-  
+
   private void addPeer(String id, int masterClusterNumber, int slaveClusterNumber, String tableCfs)
       throws Exception {
     ReplicationAdmin replicationAdmin = null;


[6/6] hbase git commit: HBASE-17328 Properly dispose of looped replication peers

Posted by ap...@apache.org.
HBASE-17328 Properly dispose of looped replication peers

Signed-off-by: Andrew Purtell <ap...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hbase/repo
Commit: http://git-wip-us.apache.org/repos/asf/hbase/commit/5ea953b0
Tree: http://git-wip-us.apache.org/repos/asf/hbase/tree/5ea953b0
Diff: http://git-wip-us.apache.org/repos/asf/hbase/diff/5ea953b0

Branch: refs/heads/0.98
Commit: 5ea953b0115ac814f67e4fb076b2fdce85dd22cf
Parents: f63b5a0
Author: Vincent <vi...@salesforce.com>
Authored: Tue Dec 20 17:49:05 2016 -0800
Committer: Andrew Purtell <ap...@apache.org>
Committed: Tue Dec 20 18:38:21 2016 -0800

----------------------------------------------------------------------
 .../regionserver/ReplicationSource.java         |   2 +
 .../regionserver/ReplicationSourceManager.java  |  14 ++
 .../replication/TestMasterReplication.java      | 140 +++++++++++++++++++
 3 files changed, 156 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hbase/blob/5ea953b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
index 9b0dfff..1e257db 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
@@ -268,6 +268,8 @@ public class ReplicationSource extends Thread
       this.terminate("ClusterId " + clusterId + " is replicating to itself: peerClusterId "
           + peerClusterId + " which is not allowed by ReplicationEndpoint:"
           + replicationEndpoint.getClass().getName(), null, false);
+      this.manager.closeQueue(this);
+      return;
     }
     LOG.info("Replicating "+clusterId + " -> " + peerClusterId);
 

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ea953b0/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
index b9d7807..574666e 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java
@@ -465,6 +465,20 @@ public class ReplicationSourceManager implements ReplicationListener {
   }
 
   /**
+   * Clear the references to the specified old source
+   * @param src source to clear
+   */
+  public void closeQueue(ReplicationSourceInterface src) {
+    LOG.info("Done with the queue " + src.getPeerClusterZnode());
+    if (src instanceof ReplicationSource) {
+      ((ReplicationSource) src).getSourceMetrics().clear();
+    }
+    this.sources.remove(src);
+    deleteSource(src.getPeerClusterZnode(), true);
+    this.hlogsById.remove(src.getPeerClusterZnode());
+  }
+
+  /**
    * Thie method first deletes all the recovered sources for the specified
    * id, then deletes the normal source (deleting all related data in ZK).
    * @param id The id of the peer cluster

http://git-wip-us.apache.org/repos/asf/hbase/blob/5ea953b0/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
----------------------------------------------------------------------
diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
index b50439a..d4c7918 100644
--- a/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
+++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/replication/TestMasterReplication.java
@@ -19,24 +19,35 @@ package org.apache.hadoop.hbase.replication;
 
 import static org.junit.Assert.assertArrayEquals;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
 import java.io.Closeable;
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Random;
+import java.util.concurrent.CountDownLatch;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.ClusterStatus;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.HBaseTestingUtility;
 import org.apache.hadoop.hbase.HColumnDescriptor;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
 import org.apache.hadoop.hbase.HTableDescriptor;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.ServerLoad;
+import org.apache.hadoop.hbase.ServerName;
 import org.apache.hadoop.hbase.testclassification.LargeTests;
 import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.Waiter;
 import org.apache.hadoop.hbase.client.Delete;
 import org.apache.hadoop.hbase.client.Durability;
 import org.apache.hadoop.hbase.client.Get;
@@ -49,10 +60,14 @@ import org.apache.hadoop.hbase.coprocessor.BaseRegionObserver;
 import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
 import org.apache.hadoop.hbase.coprocessor.ObserverContext;
 import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.regionserver.HRegion;
 import org.apache.hadoop.hbase.regionserver.HRegionServer;
+import org.apache.hadoop.hbase.regionserver.wal.HLogKey;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
 import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
 import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
 import org.junit.After;
 import org.junit.Before;
@@ -269,6 +284,46 @@ public class TestMasterReplication {
     }
   }
 
+  /**
+   * Tests the replication scenario 0 -> 0. By default
+   * {@link BaseReplicationEndpoint#canReplicateToSameCluster()} returns false,
+   * so the ReplicationSource should terminate, and no further logs should get
+   * enqueued
+   */
+  @Test(timeout = 300000)
+  public void testLoopedReplication() throws Exception {
+    LOG.info("testLoopedReplication");
+    startMiniClusters(1);
+    createTableOnClusters(table);
+    addPeer("1", 0, 0);
+    Thread.sleep(SLEEP_TIME);
+
+    // wait for source to terminate
+    final ServerName rsName =
+        utilities[0].getHBaseCluster().getRegionServer(0).getServerName();
+    Waiter.waitFor(baseConfiguration, 10000, new Waiter.Predicate<Exception>() {
+      @Override
+      public boolean evaluate() throws Exception {
+        ClusterStatus clusterStatus =
+            utilities[0].getHBaseAdmin().getClusterStatus();
+        ServerLoad serverLoad = clusterStatus.getLoad(rsName);
+        List<ReplicationLoadSource> replicationLoadSourceList =
+            serverLoad.getReplicationLoadSourceList();
+        return replicationLoadSourceList.size() == 0;
+      }
+    });
+
+    HTable[] htables = getHTablesOnClusters(tableName);
+    putAndWait(row, famName, htables[0], htables[0]);
+    rollWALAndWait(utilities[0], table.getTableName(), row);
+    ZooKeeperWatcher zkw = utilities[0].getZooKeeperWatcher();
+    String queuesZnode =
+        ZKUtil.joinZNode(zkw.baseZNode, ZKUtil.joinZNode("replication", "rs"));
+    List<String> listChildrenNoWatch = ZKUtil.listChildrenNoWatch(zkw,
+        ZKUtil.joinZNode(queuesZnode, rsName.toString()));
+    assertEquals(0, listChildrenNoWatch.size());
+  }
+
   @After
   public void tearDown() throws IOException {
     configurations = null;
@@ -434,6 +489,91 @@ public class TestMasterReplication {
     }
   }
 
+  private void rollWALAndWait(final HBaseTestingUtility utility,
+      final TableName table, final byte[] row) throws IOException {
+    final HBaseAdmin admin = utility.getHBaseAdmin();
+    final MiniHBaseCluster cluster = utility.getMiniHBaseCluster();
+
+    // find the region that corresponds to the given row.
+    HRegion region = null;
+    for (HRegion candidate : cluster.getRegions(table)) {
+      if (HRegion.rowIsInRange(candidate.getRegionInfo(), row)) {
+        region = candidate;
+        break;
+      }
+    }
+    assertNotNull(
+        "Couldn't find the region for row '" + Arrays.toString(row) + "'",
+        region);
+
+    final CountDownLatch latch = new CountDownLatch(1);
+
+    // listen for successful log rolls
+    final WALActionsListener listener = new WALActionsListener() {
+
+      @Override
+      public void preLogRoll(Path oldPath, Path newPath) throws IOException {
+
+      }
+
+      @Override
+      public void postLogRoll(Path oldPath, Path newPath) throws IOException {
+        latch.countDown();
+      }
+
+      @Override
+      public void preLogArchive(Path oldPath, Path newPath) throws IOException {
+
+      }
+
+      @Override
+      public void postLogArchive(Path oldPath, Path newPath)
+          throws IOException {
+
+      }
+
+      @Override
+      public void logRollRequested(boolean tooFewReplicas) {
+
+      }
+
+      @Override
+      public void logCloseRequested() {
+
+      }
+
+      @Override
+      public void visitLogEntryBeforeWrite(HRegionInfo info, HLogKey logKey,
+          WALEdit logEdit) {
+
+      }
+
+      @Override
+      public void visitLogEntryBeforeWrite(HTableDescriptor htd, HLogKey logKey,
+          WALEdit logEdit) {
+
+      }
+
+    };
+    region.getLog().registerWALActionsListener(listener);
+
+    // request a roll
+    admin.rollHLogWriter(
+        cluster.getServerHoldingRegion(region.getRegionInfo().getRegionName())
+            .getServerName());
+
+    // wait
+    try {
+      latch.await();
+    } catch (InterruptedException exception) {
+      LOG.warn("Interrupted while waiting for the wal of '" + region
+          + "' to roll. If later "
+          + "replication tests fail, it's probably because we should still be waiting.");
+      Thread.currentThread().interrupt();
+    }
+    region.getLog().unregisterWALActionsListener(listener);
+  }
+
   /**
    * Use a coprocessor to count puts and deletes. as KVs would be replicated back with the same
    * timestamp there is otherwise no way to count them.