You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@lucene.apache.org by pa...@apache.org on 2022/10/19 00:47:35 UTC
[lucene] branch branch_9x updated: PrimaryNode: add configurable timeout to waitForAllRemotesToClose (#11822)
This is an automated email from the ASF dual-hosted git repository.
patrickz pushed a commit to branch branch_9x
in repository https://gitbox.apache.org/repos/asf/lucene.git
The following commit(s) were added to refs/heads/branch_9x by this push:
new e861f37baa0 PrimaryNode: add configurable timeout to waitForAllRemotesToClose (#11822)
e861f37baa0 is described below
commit e861f37baa03c4001d15fa4549736ce0a840b07b
Author: Steven Schlansker <st...@gmail.com>
AuthorDate: Tue Oct 18 17:21:01 2022 -0700
PrimaryNode: add configurable timeout to waitForAllRemotesToClose (#11822)
---
lucene/CHANGES.txt | 2 +
.../apache/lucene/replicator/nrt/PrimaryNode.java | 29 +++++++++-
.../apache/lucene/replicator/nrt/NodeProcess.java | 16 ++++++
.../lucene/replicator/nrt/SimplePrimaryNode.java | 13 +++++
.../lucene/replicator/nrt/TestNRTReplication.java | 64 ++++++++++++++++++++++
5 files changed, 121 insertions(+), 3 deletions(-)
diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt
index 33edc8782d4..9b92c75f054 100644
--- a/lucene/CHANGES.txt
+++ b/lucene/CHANGES.txt
@@ -18,6 +18,8 @@ API Changes
* GITHUB#11761: TieredMergePolicy now allowed a maximum allowable deletes percentage of down to 5%, and the default
maximum allowable deletes percentage is changed from 33% to 20%. (Marc D'Mello)
+* GITHUB#11822: Configure replicator PrimaryNode replia shutdown timeout. (Steven Schlansker)
+
Improvements
---------------------
* GITHUB#11785: Improve Tessellator performance by delaying calls to the method
diff --git a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
index 8c6ac926e45..d9605a9018d 100644
--- a/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
+++ b/lucene/replicator/src/java/org/apache/lucene/replicator/nrt/PrimaryNode.java
@@ -61,6 +61,7 @@ public abstract class PrimaryNode extends Node {
private CopyState copyState;
protected final long primaryGen;
+ private int remoteCloseTimeoutMs = -1;
/**
* Contains merged segments that have been copied to all running replicas (as of when that merge
@@ -196,6 +197,24 @@ public abstract class PrimaryNode extends Node {
throw new AssertionError("missing VERSION_KEY");
}
+ /**
+ * @return the number of milliseconds to wait during shutdown for remote replicas to close
+ */
+ public int getRemoteCloseTimeoutMs() {
+ return remoteCloseTimeoutMs;
+ }
+
+ /**
+ * Set the number of milliseconds to wait during shutdown for remote replicas to close. {@code -1}
+ * (the default) means forever, and {@code 0} means don't wait at all.
+ */
+ public void setRemoteCloseTimeoutMs(int remoteCloseTimeoutMs) {
+ if (remoteCloseTimeoutMs < -1) {
+ throw new IllegalArgumentException("bad timeout + + remoteCloseTimeoutMs");
+ }
+ this.remoteCloseTimeoutMs = remoteCloseTimeoutMs;
+ }
+
@Override
public void commit() throws IOException {
Map<String, String> commitData = new HashMap<>();
@@ -318,9 +337,13 @@ public abstract class PrimaryNode extends Node {
}
private synchronized void waitForAllRemotesToClose() throws IOException {
-
- // Wait for replicas to finish or crash:
- while (true) {
+ if (remoteCloseTimeoutMs == 0) {
+ return;
+ }
+ long waitStartNs = System.nanoTime();
+ // Wait for replicas to finish or crash or timeout:
+ while (remoteCloseTimeoutMs < 0
+ || (System.nanoTime() - waitStartNs) / 1_000_000 < remoteCloseTimeoutMs) {
int count = copyingCount.get();
if (count == 0) {
return;
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
index 0392fa4669c..b21456b0e31 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/NodeProcess.java
@@ -189,6 +189,22 @@ class NodeProcess implements Closeable {
}
}
+ // Simulate a replica holding a copy state open forever, by just leaking it.
+ public void leakCopyState() throws IOException {
+ try (Connection c = new Connection(tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_LEAK_COPY_STATE);
+ c.flush();
+ }
+ }
+
+ public void setRemoteCloseTimeoutMs(int timeoutMs) throws IOException {
+ try (Connection c = new Connection(tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_SET_CLOSE_WAIT_MS);
+ c.out.writeInt(timeoutMs);
+ c.flush();
+ }
+ }
+
public void addOrUpdateDocument(Connection c, Document doc, boolean isUpdate) throws IOException {
if (isPrimary == false) {
throw new IllegalStateException("only primary can index");
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
index d0068018371..2617ecfda6d 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/SimplePrimaryNode.java
@@ -650,6 +650,10 @@ class SimplePrimaryNode extends PrimaryNode {
// merges:
static final byte CMD_NEW_REPLICA = 20;
+ // Leak a CopyState to simulate failure
+ static final byte CMD_LEAK_COPY_STATE = 24;
+ static final byte CMD_SET_CLOSE_WAIT_MS = 25;
+
/** Handles incoming request to the naive TCP server wrapping this node */
void handleOneConnection(
Random random,
@@ -821,6 +825,15 @@ class SimplePrimaryNode extends PrimaryNode {
}
break;
+ case CMD_LEAK_COPY_STATE:
+ message("leaking a CopyState");
+ getCopyState();
+ continue outer;
+
+ case CMD_SET_CLOSE_WAIT_MS:
+ setRemoteCloseTimeoutMs(in.readInt());
+ continue outer;
+
default:
throw new IllegalArgumentException("unrecognized cmd=" + cmd + " via socket=" + socket);
}
diff --git a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
index 27926c40b82..e01cf3e3ae6 100644
--- a/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
+++ b/lucene/replicator/src/test/org/apache/lucene/replicator/nrt/TestNRTReplication.java
@@ -805,6 +805,70 @@ public class TestNRTReplication extends LuceneTestCase {
primary.close();
}
+ @Nightly
+ public void testPrimaryCloseWhileCopyingNoWait() throws Exception {
+ Path path1 = createTempDir("A");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("B");
+ NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ assertWriteLockHeld(path2);
+
+ sendReplicasToPrimary(primary, replica);
+
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 1);
+
+ primary.setRemoteCloseTimeoutMs(0);
+ primary.leakCopyState();
+ primary.close();
+ replica.close();
+ }
+
+ @Nightly
+ public void testPrimaryCloseWhileCopyingShortWait() throws Exception {
+ Path path1 = createTempDir("A");
+ NodeProcess primary = startNode(-1, 0, path1, -1, true);
+
+ Path path2 = createTempDir("B");
+ NodeProcess replica = startNode(primary.tcpPort, 1, path2, -1, true);
+
+ assertWriteLockHeld(path2);
+
+ sendReplicasToPrimary(primary, replica);
+
+ LineFileDocs docs = new LineFileDocs(random());
+ try (Connection c = new Connection(primary.tcpPort)) {
+ c.out.writeByte(SimplePrimaryNode.CMD_INDEXING);
+ Document doc = docs.nextDoc();
+ primary.addOrUpdateDocument(c, doc, false);
+ }
+
+ // Refresh primary, which also pushes to replica:
+ long primaryVersion1 = primary.flush(0);
+ assertTrue(primaryVersion1 > 0);
+
+ // Wait for replica to sync up:
+ waitForVersionAndHits(replica, primaryVersion1, 1);
+
+ primary.setRemoteCloseTimeoutMs(1000);
+ primary.leakCopyState();
+ primary.close();
+ replica.close();
+ }
+
@Nightly
public void testFullClusterCrash() throws Exception {