You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@hbase.apache.org by GitBox <gi...@apache.org> on 2021/02/18 09:53:38 UTC

[GitHub] [hbase] sandeepvinayak opened a new pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

sandeepvinayak opened a new pull request #2960:
URL: https://github.com/apache/hbase/pull/2960


   On hbase remove peer, znodes are deleted for peer and then zk listeners do rest of the work:
   1. Terminate the replication source
   2. Close the queues and delete znodes for them.
   
   However, if the zk listerner somehow do not get invoked (may be a zk issue), all the region servers can crash while delete the znode for queue with `KeeperException.NoNodeException`. This could bring all the RSes down if they are participating in replication. 
   Also, there can be race condition between queue znode delete from replication source vs replication source termination through zk listerner. 
   
   In this patch, we are not crashing the region server on `NoNodeException` but instead, we can look at if the replication peer does not exist, we can terminate the source. 


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] shahrs87 commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
shahrs87 commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579384941



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -792,9 +795,13 @@ protected void shipEdits(WALEntryBatch entryBatch) {
     }
 
     private void updateLogPosition(long lastReadPosition) {
-      manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
-        this.replicationQueueInfo.isQueueRecovered(), false);
-      lastLoggedPosition = lastReadPosition;
+      try {
+        manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
+          this.replicationQueueInfo.isQueueRecovered(), false);
+        lastLoggedPosition = lastReadPosition;
+      } catch (ReplicationSourceWithoutPeerException re) {
+        source.terminate("Replication peer is removed and source should terminate", re);

Review comment:
       I am not an expert in this so please correct me if I missed anything.
   
   Whenever zk listener is invoked for peersRemoved event, we do the following things.
   https://github.com/apache/hbase/blob/branch-1/hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationTrackerZKImpl.java#L175
   https://github.com/apache/hbase/blob/branch-1/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSourceManager.java#L637-L640
   
   One of the steps in ReplicationSourceManager#peerRemoved code path is to terminate the source but in addition to that it does many things like removingPeersFromHFileRefs and removing that source from ReplicationSourceManager#sources and ReplicationSourceManager#oldSources data structure. If we invoke the source#terminate method directly we will miss all these cleanup steps.
   
   @apurtell  While typing this comment it looks like this patch got merged but I think we need more changes on this patch. 
   Cc @sandeepvinayak  @wchevreuil 

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
##########
@@ -132,17 +132,44 @@ public void addLog(String queueId, String filename) throws ReplicationException
   }
 
   @Override
-  public void removeLog(String queueId, String filename) {
+  public void removeLog(String queueId, String filename)
+    throws ReplicationSourceWithoutPeerException {
     try {
-      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.deleteNode(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
-          + filename + ")", e);
+      try {
+        String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+        znode = ZKUtil.joinZNode(znode, filename);
+        ZKUtil.deleteNode(this.zookeeper, znode);
+      } catch (KeeperException.NoNodeException e) {
+        // in case of no node exception we should not crash the region server
+        // but instead check if the replication peer has been removed.
+        // If so, we can throw here so that the source can terminate itself.
+        // This situation can occur when the replication peer znodes has been
+        // removed but the sources not terminated due to any miss from zk node delete watcher.
+        if (!doesPeerExist(queueId)) {
+          LOG.warn("Replication peer " + queueId + " has been removed", e);
+          throw new ReplicationSourceWithoutPeerException(
+            "Znodes for peer has been delete while a source is still active", e);
+        } else {
+          throw e;
+        }
+      }
+    } catch (KeeperException ke) {
+      this.abortable.abort(
+        "Failed to remove wal from queue (queueId=" + queueId + ", filename=" + filename + ")", ke);
     }
   }
 
+  private boolean doesPeerExist(String queueId) throws KeeperException {
+    String peerId = queueId;
+    if (peerId.contains("-")) {
+      // queueId will be in the form peerId + "-" + rsZNode.
+      // A peerId will not have "-" in its name, see HBASE-11394
+      peerId = queueId.split("-")[0];
+    }
+

Review comment:
       nit: extra line.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579406108



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -792,9 +795,13 @@ protected void shipEdits(WALEntryBatch entryBatch) {
     }
 
     private void updateLogPosition(long lastReadPosition) {
-      manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
-        this.replicationQueueInfo.isQueueRecovered(), false);
-      lastLoggedPosition = lastReadPosition;
+      try {
+        manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
+          this.replicationQueueInfo.isQueueRecovered(), false);
+        lastLoggedPosition = lastReadPosition;
+      } catch (ReplicationSourceWithoutPeerException re) {
+        source.terminate("Replication peer is removed and source should terminate", re);

Review comment:
       thanks @shahrs87 , I am taking a look again if this can be improved. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] wchevreuil commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
wchevreuil commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579070075



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.

Review comment:
       Ok, so the problem can happen because `ReplicationSouceManager.removePeer` is only invoked after the peer node already got deleted in ZK, whilst the `SourceShipperThread` might be trying to update the znode in between. Thanks for explaining it further, @sandeepvinayak ! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] Apache-HBase commented on pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
Apache-HBase commented on pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#issuecomment-781771972


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   1m 42s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 8 new or modified test files.  |
   ||| _ branch-1 Compile Tests _ |
   | +0 :ok: |  mvndep  |   2m 33s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |   8m 25s |  branch-1 passed  |
   | +1 :green_heart: |  compile  |   1m  6s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  compile  |   1m 15s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  checkstyle  |   2m 31s |  branch-1 passed  |
   | +1 :green_heart: |  shadedjars  |   3m 23s |  branch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  javadoc  |   1m  2s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 15s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +0 :ok: |  spotbugs  |   2m 54s |  Used deprecated FindBugs config; considering switching to SpotBugs.  |
   | +1 :green_heart: |  findbugs  |   4m 40s |  branch-1 passed  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 18s |  Maven dependency ordering for patch  |
   | +1 :green_heart: |  mvninstall  |   2m 12s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m  4s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javac  |   1m  4s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m 12s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  javac  |   1m 12s |  the patch passed  |
   | +1 :green_heart: |  checkstyle  |   0m 38s |  hbase-client: The patch generated 0 new + 3 unchanged - 4 fixed = 3 total (was 7)  |
   | -1 :x: |  checkstyle  |   1m 42s |  hbase-server: The patch generated 2 new + 0 unchanged - 24 fixed = 2 total (was 24)  |
   | -1 :x: |  whitespace  |   0m  0s |  The patch has 1 line(s) that end in whitespace. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply  |
   | +1 :green_heart: |  shadedjars  |   3m 13s |  patch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  hadoopcheck  |   5m  5s |  Patch does not cause any errors with Hadoop 2.8.5 2.9.2.  |
   | +1 :green_heart: |  javadoc  |   0m 54s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 12s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  findbugs  |   4m 49s |  the patch passed  |
   ||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |   2m 47s |  hbase-client in the patch passed.  |
   | -1 :x: |  unit  | 147m 11s |  hbase-server in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   1m 39s |  The patch does not generate ASF License warnings.  |
   |  |   | 206m 10s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | Failed junit tests | hadoop.hbase.master.cleaner.TestSnapshotFromMaster |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/3/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hbase/pull/2960 |
   | JIRA Issue | HBASE-25583 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs findbugs shadedjars hadoopcheck hbaseanti checkstyle compile |
   | uname | Linux 042785bb7565 4.15.0-101-generic #102-Ubuntu SMP Mon May 11 10:07:26 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | /home/jenkins/jenkins-home/workspace/Base-PreCommit-GitHub-PR_PR-2960/out/precommit/personality/provided.sh |
   | git revision | branch-1 / 2d26c94 |
   | Default Java | Azul Systems, Inc.-1.7.0_272-b10 |
   | Multi-JDK versions | /usr/lib/jvm/zulu-8-amd64:Azul Systems, Inc.-1.8.0_262-b19 /usr/lib/jvm/zulu-7-amd64:Azul Systems, Inc.-1.7.0_272-b10 |
   | checkstyle | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/3/artifact/out/diff-checkstyle-hbase-server.txt |
   | whitespace | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/3/artifact/out/whitespace-eol.txt |
   | unit | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/3/artifact/out/patch-unit-hbase-server.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/3/testReport/ |
   | Max. process+thread count | 3746 (vs. ulimit of 10000) |
   | modules | C: hbase-client hbase-server U: . |
   | Console output | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/3/console |
   | versions | git=1.9.1 maven=3.0.5 findbugs=3.0.1 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578674958



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",

Review comment:
       @shahrs87 That would mean we will be running some tests twice? Since override will execute all the tests in the base class. It shares many things in common but `TestReplicationSourceManager` has lot of other stuff as well which we don't need.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579443102



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -792,9 +795,13 @@ protected void shipEdits(WALEntryBatch entryBatch) {
     }
 
     private void updateLogPosition(long lastReadPosition) {
-      manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
-        this.replicationQueueInfo.isQueueRecovered(), false);
-      lastLoggedPosition = lastReadPosition;
+      try {
+        manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
+          this.replicationQueueInfo.isQueueRecovered(), false);
+        lastLoggedPosition = lastReadPosition;
+      } catch (ReplicationSourceWithoutPeerException re) {
+        source.terminate("Replication peer is removed and source should terminate", re);

Review comment:
       @shahrs87 To answer your concern, ref files get cleaned as part of replication zk node cleaner https://github.com/apache/hbase/blob/branch-1/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java#L117
   
   However, the source manager will have a reference to the terminated source which seems harmless but still should be removed as it will be the right thing to do, I can create the PR for this change. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] Apache-HBase commented on pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
Apache-HBase commented on pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#issuecomment-781919852


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   1m 32s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 8 new or modified test files.  |
   ||| _ branch-1 Compile Tests _ |
   | +0 :ok: |  mvndep  |   2m 32s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |   8m 15s |  branch-1 passed  |
   | +1 :green_heart: |  compile  |   1m  4s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  compile  |   1m 13s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  checkstyle  |   2m 33s |  branch-1 passed  |
   | +1 :green_heart: |  shadedjars  |   3m 27s |  branch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  javadoc  |   1m  1s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 14s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +0 :ok: |  spotbugs  |   2m 54s |  Used deprecated FindBugs config; considering switching to SpotBugs.  |
   | +1 :green_heart: |  findbugs  |   4m 43s |  branch-1 passed  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 17s |  Maven dependency ordering for patch  |
   | +1 :green_heart: |  mvninstall  |   2m 12s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m  7s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javac  |   1m  7s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m 12s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  javac  |   1m 12s |  the patch passed  |
   | +1 :green_heart: |  checkstyle  |   0m 40s |  hbase-client: The patch generated 0 new + 3 unchanged - 4 fixed = 3 total (was 7)  |
   | +1 :green_heart: |  checkstyle  |   1m 48s |  hbase-server: The patch generated 0 new + 0 unchanged - 24 fixed = 0 total (was 24)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  shadedjars  |   3m  9s |  patch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  hadoopcheck  |   5m 20s |  Patch does not cause any errors with Hadoop 2.8.5 2.9.2.  |
   | +1 :green_heart: |  javadoc  |   0m 51s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 16s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  findbugs  |   4m 48s |  the patch passed  |
   ||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |   2m 39s |  hbase-client in the patch passed.  |
   | -1 :x: |  unit  | 128m 19s |  hbase-server in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   0m 42s |  The patch does not generate ASF License warnings.  |
   |  |   | 186m 15s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | Failed junit tests | hadoop.hbase.security.visibility.TestVisibilityLabelsWithACL |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/5/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hbase/pull/2960 |
   | JIRA Issue | HBASE-25583 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs findbugs shadedjars hadoopcheck hbaseanti checkstyle compile |
   | uname | Linux 7fa6ffe2307c 4.15.0-101-generic #102-Ubuntu SMP Mon May 11 10:07:26 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | /home/jenkins/jenkins-home/workspace/Base-PreCommit-GitHub-PR_PR-2960/out/precommit/personality/provided.sh |
   | git revision | branch-1 / 2d26c94 |
   | Default Java | Azul Systems, Inc.-1.7.0_272-b10 |
   | Multi-JDK versions | /usr/lib/jvm/zulu-8-amd64:Azul Systems, Inc.-1.8.0_262-b19 /usr/lib/jvm/zulu-7-amd64:Azul Systems, Inc.-1.7.0_272-b10 |
   | unit | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/5/artifact/out/patch-unit-hbase-server.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/5/testReport/ |
   | Max. process+thread count | 3719 (vs. ulimit of 10000) |
   | modules | C: hbase-client hbase-server U: . |
   | Console output | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/5/console |
   | versions | git=1.9.1 maven=3.0.5 findbugs=3.0.1 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] wchevreuil commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
wchevreuil commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579074929



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -460,7 +463,7 @@ public long getLastLoggedPosition() {
     return 0;
   }
 
-  private boolean isSourceActive() {
+  public boolean isSourceActive() {

Review comment:
       nit: if it's for test only, make it package private instead?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579034850



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,
+        false);
+    } catch (ReplicationException e) {
+      exceptionThrown = e;
+
+    }
+
+    Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);

Review comment:
       @shahrs87 thanks for your feedback, I have added the test for source termination on replication exception for no peer.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578617694



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,
+        false);
+    } catch (ReplicationException e) {
+      exceptionThrown = e;
+
+    }
+
+    Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);

Review comment:
       @shahrs87 I think in that case, the RS will abort. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] Apache-HBase commented on pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
Apache-HBase commented on pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#issuecomment-781830684


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   6m 56s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 8 new or modified test files.  |
   ||| _ branch-1 Compile Tests _ |
   | +0 :ok: |  mvndep  |   2m 33s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |   8m 18s |  branch-1 passed  |
   | +1 :green_heart: |  compile  |   1m  2s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  compile  |   1m  8s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  checkstyle  |   2m 14s |  branch-1 passed  |
   | +1 :green_heart: |  shadedjars  |   2m 59s |  branch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  javadoc  |   1m  1s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m  8s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +0 :ok: |  spotbugs  |   2m 43s |  Used deprecated FindBugs config; considering switching to SpotBugs.  |
   | +1 :green_heart: |  findbugs  |   4m 25s |  branch-1 passed  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 19s |  Maven dependency ordering for patch  |
   | +1 :green_heart: |  mvninstall  |   1m 56s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m  0s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javac  |   1m  0s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m 10s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  javac  |   1m 10s |  the patch passed  |
   | +1 :green_heart: |  checkstyle  |   0m 36s |  hbase-client: The patch generated 0 new + 3 unchanged - 4 fixed = 3 total (was 7)  |
   | -1 :x: |  checkstyle  |   1m 31s |  hbase-server: The patch generated 2 new + 0 unchanged - 24 fixed = 2 total (was 24)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  shadedjars  |   2m 47s |  patch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  hadoopcheck  |   4m 40s |  Patch does not cause any errors with Hadoop 2.8.5 2.9.2.  |
   | +1 :green_heart: |  javadoc  |   0m 52s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m  6s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  findbugs  |   4m 25s |  the patch passed  |
   ||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |   2m 43s |  hbase-client in the patch passed.  |
   | +1 :green_heart: |  unit  | 109m 12s |  hbase-server in the patch passed.  |
   | +1 :green_heart: |  asflicense  |   0m 53s |  The patch does not generate ASF License warnings.  |
   |  |   | 168m 51s |   |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/4/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hbase/pull/2960 |
   | JIRA Issue | HBASE-25583 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs findbugs shadedjars hadoopcheck hbaseanti checkstyle compile |
   | uname | Linux 68f32605d42e 4.15.0-58-generic #64-Ubuntu SMP Tue Aug 6 11:12:41 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | /home/jenkins/jenkins-home/workspace/Base-PreCommit-GitHub-PR_PR-2960/out/precommit/personality/provided.sh |
   | git revision | branch-1 / 2d26c94 |
   | Default Java | Azul Systems, Inc.-1.7.0_272-b10 |
   | Multi-JDK versions | /usr/lib/jvm/zulu-8-amd64:Azul Systems, Inc.-1.8.0_262-b19 /usr/lib/jvm/zulu-7-amd64:Azul Systems, Inc.-1.7.0_272-b10 |
   | checkstyle | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/4/artifact/out/diff-checkstyle-hbase-server.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/4/testReport/ |
   | Max. process+thread count | 4221 (vs. ulimit of 10000) |
   | modules | C: hbase-client hbase-server U: . |
   | Console output | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/4/console |
   | versions | git=1.9.1 maven=3.0.5 findbugs=3.0.1 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] shahrs87 commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
shahrs87 commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578704181



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",

Review comment:
       or maybe create TestReplicationSourceBase and declare all the common stuff there and have TestReplicationSourceManager/TestReplicationSourceWithoutReplicationZnodes extend TestReplicationSourceBase. That way you can have individual tests.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] shahrs87 commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
shahrs87 commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578658038



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,
+        false);
+    } catch (ReplicationException e) {
+      exceptionThrown = e;
+
+    }
+
+    Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);

Review comment:
       We can mock Abortable and verify whether abort is called or nor ? Would that work ?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] Apache-HBase commented on pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
Apache-HBase commented on pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#issuecomment-781330528


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |  12m 30s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  1s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 5 new or modified test files.  |
   ||| _ branch-1 Compile Tests _ |
   | +0 :ok: |  mvndep  |   2m 24s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |   8m 15s |  branch-1 passed  |
   | +1 :green_heart: |  compile  |   1m  4s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  compile  |   1m 14s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  checkstyle  |   2m 34s |  branch-1 passed  |
   | +1 :green_heart: |  shadedjars  |   3m 17s |  branch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  javadoc  |   1m  2s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 13s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +0 :ok: |  spotbugs  |   2m 52s |  Used deprecated FindBugs config; considering switching to SpotBugs.  |
   | +1 :green_heart: |  findbugs  |   4m 38s |  branch-1 passed  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 16s |  Maven dependency ordering for patch  |
   | +1 :green_heart: |  mvninstall  |   2m 10s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m  3s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javac  |   1m  3s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m 13s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  javac  |   1m 13s |  the patch passed  |
   | -1 :x: |  checkstyle  |   1m 47s |  hbase-server: The patch generated 8 new + 22 unchanged - 0 fixed = 30 total (was 22)  |
   | -1 :x: |  whitespace  |   0m  0s |  The patch has 1 line(s) that end in whitespace. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply  |
   | +1 :green_heart: |  shadedjars  |   3m  9s |  patch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  hadoopcheck  |   5m 13s |  Patch does not cause any errors with Hadoop 2.8.5 2.9.2.  |
   | +1 :green_heart: |  javadoc  |   0m 52s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 10s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  findbugs  |   4m 47s |  the patch passed  |
   ||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |   2m 40s |  hbase-client in the patch passed.  |
   | -1 :x: |  unit  | 123m 53s |  hbase-server in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   0m 48s |  The patch does not generate ASF License warnings.  |
   |  |   | 192m 19s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | Failed junit tests | hadoop.hbase.replication.TestReplicationStateZKImpl |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/1/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hbase/pull/2960 |
   | JIRA Issue | HBASE-25582 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs findbugs shadedjars hadoopcheck hbaseanti checkstyle compile |
   | uname | Linux 00ca0d2a6f16 4.15.0-101-generic #102-Ubuntu SMP Mon May 11 10:07:26 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | /home/jenkins/jenkins-home/workspace/Base-PreCommit-GitHub-PR_PR-2960/out/precommit/personality/provided.sh |
   | git revision | branch-1 / 2d26c94 |
   | Default Java | Azul Systems, Inc.-1.7.0_272-b10 |
   | Multi-JDK versions | /usr/lib/jvm/zulu-8-amd64:Azul Systems, Inc.-1.8.0_262-b19 /usr/lib/jvm/zulu-7-amd64:Azul Systems, Inc.-1.7.0_272-b10 |
   | checkstyle | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/1/artifact/out/diff-checkstyle-hbase-server.txt |
   | whitespace | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/1/artifact/out/whitespace-eol.txt |
   | unit | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/1/artifact/out/patch-unit-hbase-server.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/1/testReport/ |
   | Max. process+thread count | 3723 (vs. ulimit of 10000) |
   | modules | C: hbase-client hbase-server U: . |
   | Console output | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/1/console |
   | versions | git=1.9.1 maven=3.0.5 findbugs=3.0.1 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578642757



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.

Review comment:
       These are the exceptions for reference:
   
   
   ```bash
   
   2021-02-16 20:11:58,567 FATAL [95922885,xyz_peer] regionserver.HRegionServer - ABORTING region server regionserver-111,60020,1613495922885: Failed to remove wal from queue (queueId=xyz_peer, filename=regionserver-111%2C60020%2C1613495922885.1613505863058)
   org.apache.zookeeper.KeeperException$NoNodeException: KeeperErrorCode = NoNode for /hbase/replication/rs/regionserver-111,60020,1613495922885/xyz_peer/regionserver-111%2C60020%2C1613495922885.16135058630
   58
           at org.apache.zookeeper.KeeperException.create(KeeperException.java:114)
           at org.apache.zookeeper.KeeperException.create(KeeperException.java:54)
           at org.apache.zookeeper.ZooKeeper.delete(ZooKeeper.java:890)
           at org.apache.hadoop.hbase.zookeeper.RecoverableZooKeeper.delete(RecoverableZooKeeper.java:238)
           at org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1341)
           at org.apache.hadoop.hbase.zookeeper.ZKUtil.deleteNode(ZKUtil.java:1330)
           at org.apache.hadoop.hbase.replication.ReplicationQueuesZKImpl.removeLog(ReplicationQueuesZKImpl.java:142)
           at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.cleanOldLogs(ReplicationSourceMana
   ger.java:232)
           at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.cleanOldLogs(ReplicationSourceMana
   ger.java:222)
           at org.apache.hadoop.hbase.replication.regionserver.ReplicationSourceManager.logPositionAndCleanOldLogs(Replica
   tionSourceManager.java:198)
           at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource$ReplicationSourceShipperThread.updateLogP
   osition(ReplicationSource.java:831)
           at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource$ReplicationSourceShipperThread.shipEdits(
   ReplicationSource.java:746)
           at org.apache.hadoop.hbase.replication.regionserver.ReplicationSource$ReplicationSourceShipperThread.run(Replic
   ationSource.java:650)
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578612002



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.

Review comment:
       >The only place I see updating log position (eventually calling ReplicationQueuesZKImpl.removeLog) is the SourceShipperThread, which is terminated in ReplicationSouceManager.removePeer.
   
   You are right, but the `ReplicationSouceManager.removePeer` is invoked by a `PeersWatcher` zk listerner.
   So, this means there is a race condition between `ReplicationQueuesZKImpl.removeLog` and `ReplicationSouceManager.removePeer` which terminates the source, even though race condition won't impact multiple region servers at once. 
   But if the zk listerner is not invoked at all due to some zk issue, this will effect all the region servers which are replicating and will badly affect the availability. We have seen this issue recently and it seems like ZK issue. But handling the NoNode exception gracefully will make RS resilient to this issue. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579443102



##########
File path: hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/ReplicationSource.java
##########
@@ -792,9 +795,13 @@ protected void shipEdits(WALEntryBatch entryBatch) {
     }
 
     private void updateLogPosition(long lastReadPosition) {
-      manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
-        this.replicationQueueInfo.isQueueRecovered(), false);
-      lastLoggedPosition = lastReadPosition;
+      try {
+        manager.logPositionAndCleanOldLogs(lastLoggedPath, peerClusterZnode, lastReadPosition,
+          this.replicationQueueInfo.isQueueRecovered(), false);
+        lastLoggedPosition = lastReadPosition;
+      } catch (ReplicationSourceWithoutPeerException re) {
+        source.terminate("Replication peer is removed and source should terminate", re);

Review comment:
       @shahrs87 To answer your concern, ref files get cleaned as part of replication zk node cleaner https://github.com/apache/hbase/blob/branch-1/hbase-server/src/main/java/org/apache/hadoop/hbase/master/cleaner/ReplicationZKNodeCleaner.java#L117
   
   However, the source manager will have a reference to the terminated source which seems harmless but still should be removed as it will be the right thing to do, I can create the PR for this change. 
   
   Thanks for your input. 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578619299



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,

Review comment:
       @shahrs87 That is a bit tricky, in the existing tests, when we test `ReplicationSource` we mock the `replicationsourcemanager`, I will try to write a completely new test class for this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] wchevreuil commented on a change in pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
wchevreuil commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578368537



##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
##########
@@ -132,17 +132,45 @@ public void addLog(String queueId, String filename) throws ReplicationException
   }
 
   @Override
-  public void removeLog(String queueId, String filename) {
+  public void removeLog(String queueId, String filename)
+      throws ReplicationSourceWithoutPeerException {
     try {
-      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.deleteNode(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
-          + filename + ")", e);
+      try {
+        String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+        znode = ZKUtil.joinZNode(znode, filename);
+        ZKUtil.deleteNode(this.zookeeper, znode);
+      } catch (KeeperException e) {
+        // in case of no node exception we should not crash the region server
+        // but instead check if the replication peer has been removed.
+        // If so, we can throw here so that the source can terminate itself.
+        // This situation can occur when the replication peer znodes has been
+        // removed but the sources not terminated due to any miss from zk node delete watcher.
+        if (e instanceof KeeperException.NoNodeException) {
+          if (!doesPeerExist(queueId)) {
+            LOG.warn("Replication peer " + queueId + " has been removed", e);
+            throw new ReplicationSourceWithoutPeerException(
+              "Znodes for peer has been delete where as source is still active", e);

Review comment:
       nit: "a source", not "as source"

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This exception is thrown when attempts to read an HFile fail due to corruption or truncation
+ * issues.
+ */
+@InterfaceAudience.Private

Review comment:
       nit: Javadoc description is not consistent.

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.

Review comment:
       > In case of zk watcher not getting invoked or a race condition between source deleting the log znode and zk watcher terminating the source
   
   I'm struggling to understand how this is possible without manually deleting the znodes (which is not a valid scenario). 
   
   The only place I see updating log position (eventually calling ReplicationQueuesZKImpl.removeLog) is the SourceShipperThread, which is _terminated_ in ReplicationSouceManager.removePeer. 
   
   Mind sharing a real world condition where you had identified this problem?
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] apurtell merged pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
apurtell merged pull request #2960:
URL: https://github.com/apache/hbase/pull/2960


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] sandeepvinayak commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
sandeepvinayak commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579035411



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.

Review comment:
       @wchevreuil Let me know if that answers your concern or you have any other feedback on this.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] Apache-HBase commented on pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
Apache-HBase commented on pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#issuecomment-781693830


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   1m 40s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 7 new or modified test files.  |
   ||| _ branch-1 Compile Tests _ |
   | +0 :ok: |  mvndep  |   2m 36s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |   8m 34s |  branch-1 passed  |
   | +1 :green_heart: |  compile  |   1m  5s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  compile  |   1m 18s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  checkstyle  |   2m 34s |  branch-1 passed  |
   | +1 :green_heart: |  shadedjars  |   3m 23s |  branch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  javadoc  |   1m  3s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 13s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +0 :ok: |  spotbugs  |   2m 54s |  Used deprecated FindBugs config; considering switching to SpotBugs.  |
   | +1 :green_heart: |  findbugs  |   4m 39s |  branch-1 passed  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 16s |  Maven dependency ordering for patch  |
   | +1 :green_heart: |  mvninstall  |   2m 15s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m  4s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javac  |   1m  4s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m 17s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  javac  |   1m 17s |  the patch passed  |
   | +1 :green_heart: |  checkstyle  |   0m 38s |  hbase-client: The patch generated 0 new + 3 unchanged - 4 fixed = 3 total (was 7)  |
   | +1 :green_heart: |  checkstyle  |   1m 46s |  hbase-server: The patch generated 0 new + 0 unchanged - 24 fixed = 0 total (was 24)  |
   | -1 :x: |  whitespace  |   0m  0s |  The patch has 1 line(s) that end in whitespace. Use git apply --whitespace=fix <<patch_file>>. Refer https://git-scm.com/docs/git-apply  |
   | +1 :green_heart: |  shadedjars  |   3m 11s |  patch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  hadoopcheck  |   5m 15s |  Patch does not cause any errors with Hadoop 2.8.5 2.9.2.  |
   | +1 :green_heart: |  javadoc  |   0m 50s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m 11s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  findbugs  |   4m 49s |  the patch passed  |
   ||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |   2m 43s |  hbase-client in the patch passed.  |
   | +1 :green_heart: |  unit  | 124m 36s |  hbase-server in the patch passed.  |
   | +1 :green_heart: |  asflicense  |   0m 44s |  The patch does not generate ASF License warnings.  |
   |  |   | 183m 11s |   |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/2/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hbase/pull/2960 |
   | JIRA Issue | HBASE-25583 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs findbugs shadedjars hadoopcheck hbaseanti checkstyle compile |
   | uname | Linux 0df7c4bcaf1d 4.15.0-101-generic #102-Ubuntu SMP Mon May 11 10:07:26 UTC 2020 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | /home/jenkins/jenkins-home/workspace/Base-PreCommit-GitHub-PR_PR-2960/out/precommit/personality/provided.sh |
   | git revision | branch-1 / 2d26c94 |
   | Default Java | Azul Systems, Inc.-1.7.0_272-b10 |
   | Multi-JDK versions | /usr/lib/jvm/zulu-8-amd64:Azul Systems, Inc.-1.8.0_262-b19 /usr/lib/jvm/zulu-7-amd64:Azul Systems, Inc.-1.7.0_272-b10 |
   | whitespace | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/2/artifact/out/whitespace-eol.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/2/testReport/ |
   | Max. process+thread count | 3478 (vs. ulimit of 10000) |
   | modules | C: hbase-client hbase-server U: . |
   | Console output | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/2/console |
   | versions | git=1.9.1 maven=3.0.5 findbugs=3.0.1 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] shahrs87 commented on a change in pull request #2960: HBASE-25582: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
shahrs87 commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r578521110



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,
+        false);
+    } catch (ReplicationException e) {
+      exceptionThrown = e;
+

Review comment:
       nit: extra line

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,

Review comment:
       Did we test that ReplicationSource was terminated when we saw ReplicationSourceWithoutPeerException ?

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationQueuesZKImpl.java
##########
@@ -132,17 +132,45 @@ public void addLog(String queueId, String filename) throws ReplicationException
   }
 
   @Override
-  public void removeLog(String queueId, String filename) {
+  public void removeLog(String queueId, String filename)
+      throws ReplicationSourceWithoutPeerException {
     try {
-      String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
-      znode = ZKUtil.joinZNode(znode, filename);
-      ZKUtil.deleteNode(this.zookeeper, znode);
-    } catch (KeeperException e) {
-      this.abortable.abort("Failed to remove wal from queue (queueId=" + queueId + ", filename="
-          + filename + ")", e);
+      try {
+        String znode = ZKUtil.joinZNode(this.myQueuesZnode, queueId);
+        znode = ZKUtil.joinZNode(znode, filename);
+        ZKUtil.deleteNode(this.zookeeper, znode);
+      } catch (KeeperException e) {

Review comment:
       We will swallow all the KeeperException which are **not** of` NoNodeException` type. In the internal catch block we can just catch `KeeperException.NoNodeException` and in the else block of `if (!doesPeerExist(queueId)) ` condition we need to rethrow exception so that outer catch block will handle it and abort RS.

##########
File path: hbase-client/src/main/java/org/apache/hadoop/hbase/replication/ReplicationSourceWithoutPeerException.java
##########
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication;
+
+import org.apache.hadoop.hbase.classification.InterfaceAudience;
+
+/**
+ * This exception is thrown when attempts to read an HFile fail due to corruption or truncation
+ * issues.
+ */
+@InterfaceAudience.Private

Review comment:
       copy paste error.  :)

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.
+   * @throws Exception
+   */
+  @Test
+  public void testReplicationSourceRunningWithoutPeerZnodes() throws Exception {
+    MultiVersionConcurrencyControl mvcc = new MultiVersionConcurrencyControl();
+    KeyValue kv = new KeyValue(r1, f1, r1);
+    WALEdit edit = new WALEdit();
+    edit.add(kv);
+
+    List<WALActionsListener> listeners = new ArrayList<>();
+    listeners.add(replication);
+    final WALFactory wals = new WALFactory(utility.getConfiguration(), listeners,
+      URLEncoder.encode("regionserver:60020", "UTF8"));
+    final WAL wal = wals.getWAL(hri.getEncodedNameAsBytes(), hri.getTable().getNamespace());
+    manager.init();
+
+    final long txid = wal.append(htd, hri,
+      new WALKey(hri.getEncodedNameAsBytes(), test, System.currentTimeMillis(), mvcc),
+      edit, true);
+    wal.sync(txid);
+
+
+    wal.rollWriter();
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/peers/1");
+    ZKUtil.deleteNodeRecursively(zkw, "/hbase/replication/rs/"+ server.getServerName() + "/1");
+
+    ReplicationException exceptionThrown = null;
+    try {
+      manager.logPositionAndCleanOldLogs(manager.getSources().get(0).getCurrentPath(), "1", 0, false,
+        false);
+    } catch (ReplicationException e) {
+      exceptionThrown = e;
+
+    }
+
+    Assert.assertTrue(exceptionThrown instanceof ReplicationSourceWithoutPeerException);

Review comment:
       Would like to see one test where we didn't encounter `KeeperException.NoNodeException` in `ReplicationQueuesZKImpl#removeLog`   but some other `KeeperException` and verify that `RS#terminate` was called.

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",

Review comment:
       I see most of the setup method is duplicated from `TestReplicationSourceManager`. Could we somehow abstract out some methods in TestReplicationSourceManager and make TestReplicationSourceWithoutReplicationZnodes extend TestReplicationSourceManager and just override those abstract methods ?

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+

Review comment:
       nit: remove new lines between each variable.

##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);

Review comment:
       nit: change the  logger class from TestReplicationSourceManager to TestReplicationSourceWithoutReplicationZnodes




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] Apache-HBase commented on pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
Apache-HBase commented on pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#issuecomment-782032154


   :broken_heart: **-1 overall**
   
   
   
   
   
   
   | Vote | Subsystem | Runtime | Comment |
   |:----:|----------:|--------:|:--------|
   | +0 :ok: |  reexec  |   6m 42s |  Docker mode activated.  |
   ||| _ Prechecks _ |
   | +1 :green_heart: |  dupname  |   0m  0s |  No case conflicting files found.  |
   | +1 :green_heart: |  hbaseanti  |   0m  0s |  Patch does not have any anti-patterns.  |
   | +1 :green_heart: |  @author  |   0m  0s |  The patch does not contain any @author tags.  |
   | +1 :green_heart: |  test4tests  |   0m  0s |  The patch appears to include 9 new or modified test files.  |
   ||| _ branch-1 Compile Tests _ |
   | +0 :ok: |  mvndep  |   2m 28s |  Maven dependency ordering for branch  |
   | +1 :green_heart: |  mvninstall  |   8m 11s |  branch-1 passed  |
   | +1 :green_heart: |  compile  |   1m  1s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  compile  |   1m  9s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  checkstyle  |   2m 14s |  branch-1 passed  |
   | +1 :green_heart: |  shadedjars  |   3m  6s |  branch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  javadoc  |   1m  3s |  branch-1 passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m  6s |  branch-1 passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +0 :ok: |  spotbugs  |   2m 39s |  Used deprecated FindBugs config; considering switching to SpotBugs.  |
   | +1 :green_heart: |  findbugs  |   4m 21s |  branch-1 passed  |
   ||| _ Patch Compile Tests _ |
   | +0 :ok: |  mvndep  |   0m 18s |  Maven dependency ordering for patch  |
   | +1 :green_heart: |  mvninstall  |   2m  4s |  the patch passed  |
   | +1 :green_heart: |  compile  |   0m 59s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javac  |   0m 59s |  the patch passed  |
   | +1 :green_heart: |  compile  |   1m 10s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  javac  |   1m 10s |  the patch passed  |
   | +1 :green_heart: |  checkstyle  |   0m 34s |  hbase-client: The patch generated 0 new + 3 unchanged - 4 fixed = 3 total (was 7)  |
   | +1 :green_heart: |  checkstyle  |   1m 31s |  hbase-server: The patch generated 0 new + 0 unchanged - 28 fixed = 0 total (was 28)  |
   | +1 :green_heart: |  whitespace  |   0m  0s |  The patch has no whitespace issues.  |
   | +1 :green_heart: |  shadedjars  |   2m 51s |  patch has no errors when building our shaded downstream artifacts.  |
   | +1 :green_heart: |  hadoopcheck  |   4m 44s |  Patch does not cause any errors with Hadoop 2.8.5 2.9.2.  |
   | +1 :green_heart: |  javadoc  |   0m 53s |  the patch passed with JDK Azul Systems, Inc.-1.8.0_262-b19  |
   | +1 :green_heart: |  javadoc  |   1m  8s |  the patch passed with JDK Azul Systems, Inc.-1.7.0_272-b10  |
   | +1 :green_heart: |  findbugs  |   4m 30s |  the patch passed  |
   ||| _ Other Tests _ |
   | +1 :green_heart: |  unit  |   2m 44s |  hbase-client in the patch passed.  |
   | -1 :x: |  unit  | 109m 27s |  hbase-server in the patch failed.  |
   | +1 :green_heart: |  asflicense  |   0m 53s |  The patch does not generate ASF License warnings.  |
   |  |   | 168m 59s |   |
   
   
   | Reason | Tests |
   |-------:|:------|
   | Failed junit tests | hadoop.hbase.regionserver.TestSplitTransactionOnCluster |
   
   
   | Subsystem | Report/Notes |
   |----------:|:-------------|
   | Docker | ClientAPI=1.41 ServerAPI=1.41 base: https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/6/artifact/out/Dockerfile |
   | GITHUB PR | https://github.com/apache/hbase/pull/2960 |
   | JIRA Issue | HBASE-25583 |
   | Optional Tests | dupname asflicense javac javadoc unit spotbugs findbugs shadedjars hadoopcheck hbaseanti checkstyle compile |
   | uname | Linux da003dc4e51f 4.15.0-60-generic #67-Ubuntu SMP Thu Aug 22 16:55:30 UTC 2019 x86_64 x86_64 x86_64 GNU/Linux |
   | Build tool | maven |
   | Personality | /home/jenkins/jenkins-home/workspace/Base-PreCommit-GitHub-PR_PR-2960/out/precommit/personality/provided.sh |
   | git revision | branch-1 / 2d26c94 |
   | Default Java | Azul Systems, Inc.-1.7.0_272-b10 |
   | Multi-JDK versions | /usr/lib/jvm/zulu-8-amd64:Azul Systems, Inc.-1.8.0_262-b19 /usr/lib/jvm/zulu-7-amd64:Azul Systems, Inc.-1.7.0_272-b10 |
   | unit | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/6/artifact/out/patch-unit-hbase-server.txt |
   |  Test Results | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/6/testReport/ |
   | Max. process+thread count | 4626 (vs. ulimit of 10000) |
   | modules | C: hbase-client hbase-server U: . |
   | Console output | https://ci-hadoop.apache.org/job/HBase/job/HBase-PreCommit-GitHub-PR/job/PR-2960/6/console |
   | versions | git=1.9.1 maven=3.0.5 findbugs=3.0.1 |
   | Powered by | Apache Yetus 0.12.0 https://yetus.apache.org |
   
   
   This message was automatically generated.
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [hbase] wchevreuil commented on a change in pull request #2960: HBASE-25583: Handle the running replication source gracefully with replication nodes deleted

Posted by GitBox <gi...@apache.org>.
wchevreuil commented on a change in pull request #2960:
URL: https://github.com/apache/hbase/pull/2960#discussion_r579070075



##########
File path: hbase-server/src/test/java/org/apache/hadoop/hbase/replication/regionserver/TestReplicationSourceWithoutReplicationZnodes.java
##########
@@ -0,0 +1,234 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.replication.regionserver;
+
+import static org.apache.hadoop.hbase.replication.ReplicationSourceDummy.fakeExceptionMessage;
+
+import java.io.IOException;
+import java.net.URLEncoder;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.ClusterId;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionInfo;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.regionserver.MultiVersionConcurrencyControl;
+import org.apache.hadoop.hbase.regionserver.wal.WALActionsListener;
+import org.apache.hadoop.hbase.regionserver.wal.WALEdit;
+import org.apache.hadoop.hbase.replication.ReplicationException;
+import org.apache.hadoop.hbase.replication.ReplicationSourceDummyWithNoTermination;
+import org.apache.hadoop.hbase.replication.ReplicationSourceWithoutPeerException;
+import org.apache.hadoop.hbase.replication.ReplicationStateZKBase;
+import org.apache.hadoop.hbase.replication.regionserver.helper.DummyServer;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.FSUtils;
+import org.apache.hadoop.hbase.wal.WAL;
+import org.apache.hadoop.hbase.wal.WALFactory;
+import org.apache.hadoop.hbase.wal.WALKey;
+import org.apache.hadoop.hbase.zookeeper.ZKClusterId;
+import org.apache.hadoop.hbase.zookeeper.ZKUtil;
+import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+@Category(MediumTests.class)
+public class TestReplicationSourceWithoutReplicationZnodes {
+
+  private static final Log LOG =
+    LogFactory.getLog(TestReplicationSourceManager.class);
+
+  private static Configuration conf;
+
+  private static HBaseTestingUtility utility;
+
+  private static Replication replication;
+
+  private static ReplicationSourceManager manager;
+
+  private static ZooKeeperWatcher zkw;
+
+  private static HTableDescriptor htd;
+
+  private static HRegionInfo hri;
+
+  private static final byte[] r1 = Bytes.toBytes("r1");
+
+  private static final byte[] r2 = Bytes.toBytes("r2");
+
+  private static final byte[] f1 = Bytes.toBytes("f1");
+
+  private static final byte[] f2 = Bytes.toBytes("f2");
+
+  private static final TableName test =
+    TableName.valueOf("test");
+
+  private static final String slaveId = "1";
+
+  private static FileSystem fs;
+
+  private static Path oldLogDir;
+
+  private static Path logDir;
+
+  private static DummyServer server;
+
+  @BeforeClass
+  public static void setUpBeforeClass() throws Exception {
+
+    conf = HBaseConfiguration.create();
+    conf.set("replication.replicationsource.implementation",
+      ReplicationSourceDummyWithNoTermination.class.getCanonicalName());
+    conf.setBoolean(HConstants.REPLICATION_ENABLE_KEY,
+      HConstants.REPLICATION_ENABLE_DEFAULT);
+    conf.setLong("replication.sleep.before.failover", 2000);
+    conf.setInt("replication.source.maxretriesmultiplier", 10);
+    utility = new HBaseTestingUtility(conf);
+    utility.startMiniZKCluster();
+
+    zkw = new ZooKeeperWatcher(conf, "test", null);
+    ZKUtil.createWithParents(zkw, "/hbase/replication");
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1",
+      Bytes.toBytes(conf.get(HConstants.ZOOKEEPER_QUORUM) + ":"
+        + conf.get(HConstants.ZOOKEEPER_CLIENT_PORT) + ":/1"));
+    ZKUtil.createWithParents(zkw, "/hbase/replication/peers/1/peer-state");
+    ZKUtil.setData(zkw, "/hbase/replication/peers/1/peer-state",
+      ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+    ZKUtil.createWithParents(zkw, "/hbase/replication/state");
+    ZKUtil.setData(zkw, "/hbase/replication/state", ReplicationStateZKBase.ENABLED_ZNODE_BYTES);
+
+    ZKClusterId.setClusterId(zkw, new ClusterId());
+    FSUtils.setRootDir(utility.getConfiguration(), utility.getDataTestDir());
+    fs = FileSystem.get(conf);
+    oldLogDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_OLDLOGDIR_NAME);
+    logDir = new Path(utility.getDataTestDir(),
+      HConstants.HREGION_LOGDIR_NAME);
+    server = new DummyServer(conf, "example.hostname.com", zkw);
+    replication = new Replication(server, fs, logDir, oldLogDir);
+    manager = replication.getReplicationManager();
+
+    manager.addSource(slaveId);
+
+    htd = new HTableDescriptor(test);
+    HColumnDescriptor col = new HColumnDescriptor(f1);
+    col.setScope(HConstants.REPLICATION_SCOPE_GLOBAL);
+    htd.addFamily(col);
+    col = new HColumnDescriptor(f2);
+    col.setScope(HConstants.REPLICATION_SCOPE_LOCAL);
+    htd.addFamily(col);
+
+    hri = new HRegionInfo(htd.getTableName(), r1, r2);
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    try {
+      manager.join();
+    } catch (RuntimeException re) {
+      if (re.getMessage().equals(fakeExceptionMessage)) {
+        LOG.info("It is fine");
+      }
+    }
+    utility.shutdownMiniCluster();
+  }
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private void cleanLogDir() throws IOException {
+    fs.delete(logDir, true);
+    fs.delete(oldLogDir, true);
+  }
+
+  @Before
+  public void setUp() throws Exception {
+    LOG.info("Start " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    LOG.info("End " + testName.getMethodName());
+    cleanLogDir();
+  }
+
+  /**
+   * When the peer is removed, hbase remove the peer znodes and there is zk watcher
+   * which terminates the replication sources. In case of zk watcher not getting invoked
+   * or a race condition between source deleting the log znode and zk watcher
+   * terminating the source, we might get the NoNode exception. In that case, the right
+   * thing is to terminate the replication source.

Review comment:
       Ok, so the problem can happen because `ReplicationSouceManager.removePeer` is only invoked after the peer node already got deleted in ZK, whilst the `SourceShipperThread` might be trying update the znode in between. Thanks for explaining it further, @sandeepvinayak ! 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org