You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by pa...@apache.org on 2022/10/19 00:47:35 UTC

[lucene] branch branch_9x updated: PrimaryNode: add configurable timeout to waitForAllRemotesToClose (#11822)

This is an automated email from the ASF dual-hosted git repository.

patrickz pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/lucene.git


The following commit(s) were added to refs/heads/branch_9x by this push:
     new e861f37baa0 PrimaryNode: add configurable timeout to waitForAllRemotesToClose (#11822)
e861f37baa0 is described below

commit e861f37baa03c4001d15fa4549736ce0a840b07b
Author: Steven Schlansker <st...@gmail.com>
AuthorDate: Tue Oct 18 17:21:01 2022 -0700

    PrimaryNode: add configurable timeout to waitForAllRemotesToClose (#11822)
---
 lucene/CHANGES.txt                                 |  2 +
 .../apache/lucene/replicator/nrt/PrimaryNode.java  | 29 +++++++++-
 .../apache/lucene/replicator/nrt/NodeProcess.java  | 16 ++++++
 .../lucene/replicator/nrt/SimplePrimaryNode.java   | 13 +++++
 .../lucene/replicator/nrt/TestNRTReplication.java  | 64 ++++++++++++++++++++++
 5 files changed, 121 insertions(+), 3 deletions(-)

diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 33edc8782d4..9b92c75f054 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -18,6 +18,8 @@ API Changes
 * GITHUB#11761: TieredMergePolicy now allowed a maximum allowable deletes percentage of down to 5%, and the default
   maximum allowable deletes percentage is changed from 33% to 20%. (Marc D'Mello)
 
+* GITHUB#11822: Configure replicator PrimaryNode replia shutdown timeout. (Steven Schlansker)
+
 Improvements
 ---------------------
 * GITHUB#11785: Improve Tessellator performance by delaying calls to the method
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
index 8c6ac926e45..d9605a9018d 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -61,6 +61,7 @@ public abstract class PrimaryNode extends Node {
   private CopyState copyState;
 
   protected final long primaryGen;
+  private int remoteCloseTimeoutMs = -1;
 
   /**
    * Contains merged segments that have been copied to all running replicas (as of when that merge
@@ -196,6 +197,24 @@ public abstract class PrimaryNode extends Node {
     throw new AssertionError("missing VERSION_KEY");
   }
 
+  /**
+   * @return the number of milliseconds to wait during shutdown for remote replicas to close
+   */
+  public int getRemoteCloseTimeoutMs() {
+    return remoteCloseTimeoutMs;
+  }
+
+  /**
+   * Set the number of milliseconds to wait during shutdown for remote replicas to close. {@code -1}
+   * (the default) means forever, and {@code 0} means don't wait at all.
+   */
+  public void setRemoteCloseTimeoutMs(int remoteCloseTimeoutMs) {
+    if (remoteCloseTimeoutMs < -1) {
+      throw new IllegalArgumentException("bad timeout + + remoteCloseTimeoutMs");
+    }
+    this.remoteCloseTimeoutMs = remoteCloseTimeoutMs;
+  }
+
   @Override
   public void commit() throws IOException {
     Map<String, String> commitData = new HashMap<>();
@@ -318,9 +337,13 @@ public abstract class PrimaryNode extends Node {
   }
 
   private synchronized void waitForAllRemotesToClose() throws IOException {
-
-    // Wait for replicas to finish or crash:
-    while (true) {
+    if (remoteCloseTimeoutMs == 0) {
+      return;
+    }
+    long waitStartNs = System.nanoTime();
+    // Wait for replicas to finish or crash or timeout:
+    while (remoteCloseTimeoutMs < 0
+        || (System.nanoTime() - waitStartNs) / 1_000_000 < remoteCloseTimeoutMs) {
       int count = copyingCount.get();
       if (count == 0) {
         return;
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
index 0392fa4669c..b21456b0e31 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -189,6 +189,22 @@ class NodeProcess implements Closeable {
     }
   }
 
+  // Simulate a replica holding a copy state open forever, by just leaking it.
+  public void leakCopyState() throws IOException {
+    try (Connection c = new Connection(tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_LEAK_COPY_STATE);
+      c.flush();
+    }
+  }
+
+  public void setRemoteCloseTimeoutMs(int timeoutMs) throws IOException {
+    try (Connection c = new Connection(tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_SET_CLOSE_WAIT_MS);
+      c.out.writeInt(timeoutMs);
+      c.flush();
+    }
+  }
+
   public void addOrUpdateDocument(Connection c, Document doc, boolean isUpdate) throws IOException {
     if (isPrimary == false) {
       throw new IllegalStateException("only primary can index");
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
index d0068018371..2617ecfda6d 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -650,6 +650,10 @@ class SimplePrimaryNode extends PrimaryNode {
   // merges:
   static final byte CMD_NEW_REPLICA = 20;
 
+  // Leak a CopyState to simulate failure
+  static final byte CMD_LEAK_COPY_STATE = 24;
+  static final byte CMD_SET_CLOSE_WAIT_MS = 25;
+
   /** Handles incoming request to the naive TCP server wrapping this node */
   void handleOneConnection(
       Random random,
@@ -821,6 +825,15 @@ class SimplePrimaryNode extends PrimaryNode {
           }
           break;
 
+        case CMD_LEAK_COPY_STATE:
+          message("leaking a CopyState");
+          getCopyState();
+          continue outer;
+
+        case CMD_SET_CLOSE_WAIT_MS:
+          setRemoteCloseTimeoutMs(in.readInt());
+          continue outer;
+
         default:
           throw new IllegalArgumentException("unrecognized cmd=" + cmd + " via socket=" + socket);
       }
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 27926c40b82..e01cf3e3ae6 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@ -805,6 +805,70 @@ public class TestNRTReplication extends LuceneTestCase {
     primary.close();
   }
 
+  @Nightly
+  public void testPrimaryCloseWhileCopyingNoWait() throws Exception {
+    Path path1 = createTempDir("A");
+    NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+    Path path2 = createTempDir("B");
+    NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+    assertWriteLockHeld(path2);
+
+    sendReplicasToPrimary(primary, replica);
+
+    LineFileDocs docs = new LineFileDocs(random());
+    try (Connection c = new Connection(primary.tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+      Document doc = docs.nextDoc();
+      primary.addOrUpdateDocument(c, doc, false);
+    }
+
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion1 = primary.flush(0);
+    assertTrue(primaryVersion1 > 0);
+
+    // Wait for replica to sync up:
+    waitForVersionAndHits(replica, primaryVersion1, 1);
+
+    primary.setRemoteCloseTimeoutMs(0);
+    primary.leakCopyState();
+    primary.close();
+    replica.close();
+  }
+
+  @Nightly
+  public void testPrimaryCloseWhileCopyingShortWait() throws Exception {
+    Path path1 = createTempDir("A");
+    NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+    Path path2 = createTempDir("B");
+    NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+    assertWriteLockHeld(path2);
+
+    sendReplicasToPrimary(primary, replica);
+
+    LineFileDocs docs = new LineFileDocs(random());
+    try (Connection c = new Connection(primary.tcpPort)) {
+      c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+      Document doc = docs.nextDoc();
+      primary.addOrUpdateDocument(c, doc, false);
+    }
+
+    // Refresh primary, which also pushes to replica:
+    long primaryVersion1 = primary.flush(0);
+    assertTrue(primaryVersion1 > 0);
+
+    // Wait for replica to sync up:
+    waitForVersionAndHits(replica, primaryVersion1, 1);
+
+    primary.setRemoteCloseTimeoutMs(1000);
+    primary.leakCopyState();
+    primary.close();
+    replica.close();
+  }
+
   @Nightly
   public void testFullClusterCrash() throws Exception {