You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Hangleton (via GitHub)" <gi...@apache.org> on 2023/04/13 09:48:33 UTC

[GitHub] [kafka] Hangleton opened a new pull request, #13558: [KAFKA-14845] Implement test case simulating broker registration failure with Zookeeper when a session id is lost

Hangleton opened a new pull request, #13558:
URL: https://github.com/apache/kafka/pull/13558

   This PR implements the test case for [KAFKA-14845](https://issues.apache.org/jira/browse/KAFKA-14845). This test simulates the loss of a session ID with Zookeeper (for instance due to network partition) while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can result in broker registration failing with due to a `NODEEXISTS` error which is not handled by the fix [KAFKA-6584](https://issues.apache.org/jira/browse/KAFKA-6584) because in this case, the ID of the session of the conflicting ephemeral znode is not known by the broker.
   
   The corresponding timeline of events exercised by this test is reproduced below.
   
   A fix will be proposed in a separate PR.
   
   ![phoque (1)](https://user-images.githubusercontent.com/638312/231722168-5bad3910-70e0-4a05-a4d1-d4764e51f3ff.png)
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1198005710


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   This could indeed be an important element to consider when configuring the retry strategy. We probably don't want to retry past the session timeout because that exposes to the possibility that the session used during the first call is not valid anymore.
   
   And, the session timeout can be used as well to calibrate the response times to be expected, since the Zk client in Kafka does not make use of request timeouts. Another timeout to consider could be the read timeout which is the maximum time allowed without activity on a connection before that connection is closed. The Zookeeper protocol includes heartbeats (pings) performed at least once every half of the read timeout. The read timeout is set to be the session timeout multiplied by 2/3 divided by the number of hosts registered in the Zookeeper cluster.
   
   I haven't looked at Apache Curator to see what approach is used for retries there, but that could be an interesting entry point I suppose.
   
   So the retry strategy here requires a bit more thoughts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205648643


##########
checkstyle/suppressions.xml:
##########
@@ -41,6 +41,8 @@
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
+    <suppress checks="ImportControl"
+              files="(ZkBrokerRegistrationTest|DelegatingRequest|InstrumentedRequestProcessor|InstrumentedZooKeeperServer|ReceiptEvent|ZkBrokerRegistrationStubs).java"/>

Review Comment:
   Apologies if I am missing the obvious, but am I getting it correctly that it is not only an ordering problem, but that the packages of the imported classes need to be defined in [import-control-core.xml](https://github.com/apache/kafka/blob/trunk/checkstyle/import-control-core.xml)? I will modify that file and remove the suppressions rule.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "soarez (via GitHub)" <gi...@apache.org>.
soarez commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1209954592


##########
checkstyle/suppressions.xml:
##########
@@ -41,6 +41,8 @@
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
+    <suppress checks="ImportControl"
+              files="(ZkBrokerRegistrationTest|DelegatingRequest|InstrumentedRequestProcessor|InstrumentedZooKeeperServer|ReceiptEvent|ZkBrokerRegistrationStubs).java"/>

Review Comment:
   Makes sense. Thanks!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1552906809

   > Thank you for this PR.
   > 
   > Despite the detailed information in the PR description and JIRA (thank you!), I'm still unsure as to how this happens, and whether if it's a ZooKeeper bug we're mitigating against, or if there's something wrong about how we're using it. However, the proposed solution seems pretty innocuous.
   > 
   > I also wonder if some of the new test classes — `DelegatingRequest`, `InstrumentedRequestProcessor` and `InstrumentedZooKeeperServer` — would be better replaced by the use of mockito's `spy()`.
   
   Hi, Igor, thanks for reviewing the PR. I will get back to you with an update and follow-up on your feedback by EOD. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1198005710


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   This is indeed an important element to consider when configuring the retry strategy. We probably don't want to retry past the session timeout because that exposes to the possibility that the session used during the first call is not valid anymore.
   
   And, the session timeout can be used as well to calibrate the response times to be expected, since the Zk client in Kafka does not make use of request timeouts. Another timeout to consider could be the read timeout which is the maximum time allowed without activity on a connection before that connection is closed. The Zookeeper protocol includes heartbeats (pings) performed at least once every half of the read timeout. The read timeout is set to be the session timeout multiplied by 2/3 divided by the number of hosts registered in the Zookeeper cluster.
   
   I haven't looked at Apache Curator to see what approach is used for retries there, but that could be an interesting entry point I suppose.
   
   Overall, the retry strategy here requires a bit more thoughts.
   
   Thanks for raising the point.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1564157619

   Hi, Igor, thanks for the review. I added the changes you reminded me about above. I am going through an additional test runs for this integration test to ensure there is no intermittent failure. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1198005710


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   This could indeed be an important element to consider when configuring the retry strategy. We probably don't want to retry past the session timeout because that exposes to the possibility that the session used during the first call is not valid anymore.
   
   And, the session timeout can be used as well to calibrate the response times to be expected, since the Zk client in Kafka does not use request timeouts. Another timeout to consider could be the read timeout which is the maximum time allowed without activity on a connection before that connection is closed. The Zookeeper protocol includes heartbeats (pings) performed at least once every half of the read timeout. The read timeout is set to be the session timeout multiplied by 2/3 divided by the number of hosts registered in the Zookeeper cluster.
   
   I haven't looked at Apache Curator to see what approach is used for retries there, but that could be an interesting entry point I suppose.
   
   So the retry strategy here requires a bit more thoughts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1200432343


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   This makes sense, sure!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1199068761


##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;
+
+        SandboxedZookeeper(CountDownLatch zookeeperStopLatch, ZkTestContext spec) {
+            this.zookeeperStopLatch = zookeeperStopLatch;
+            this.spec = spec;
+        }
+
+        public void run() {
+            ServerCnxnFactory cnxnFactory = null;
+
+            try {
+                Path dataDir = Files.createTempDirectory("zk");
+
+                Properties zkProperties = new Properties();
+                zkProperties.put("dataDir", dataDir.toFile().getPath());
+                zkProperties.put("clientPort", String.valueOf(zkPort));
+                zkProperties.put("serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+                QuorumPeerConfig config = new QuorumPeerConfig();
+                config.parseProperties(zkProperties);
+                FileTxnSnapLog txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
+
+                ZooKeeperServer zookeeper = new InstrumentedZooKeeperServer(
+                    null,
+                    txnLog,
+                    config.getTickTime(),
+                    config.getMinSessionTimeout(),
+                    config.getMaxSessionTimeout(),
+                    config.getClientPortListenBacklog(),
+                    null,
+                    config.getInitialConfig(),
+                    spec) {
+
+                    @Override
+                    protected void setupRequestProcessors() {
+                        processor = new InstrumentedRequestProcessor(this, spec);
+                        RequestProcessor syncProcessor = new SyncRequestProcessor(this, processor);
+                        ((SyncRequestProcessor) syncProcessor).start();
+                        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+                        ((PrepRequestProcessor) firstProcessor).start();
+                    }
+                };
+
+                cnxnFactory = ServerCnxnFactory.createFactory();
+                cnxnFactory.configure(
+                    config.getClientPortAddress(),
+                    config.getMaxClientCnxns(),
+                    config.getClientPortListenBacklog(),
+                    false);
+                cnxnFactory.startup(zookeeper);
+
+                zookeeperStartLatch.countDown();
+                zookeeperStopLatch.await();

Review Comment:
   Ah, I see, sorry I misunderstood. Yes, it appears that that thread is rather unnecessary, cannot remember why i put it... Thanks for the callout.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "soarez (via GitHub)" <gi...@apache.org>.
soarez commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197495849


##########
core/src/test/resources/log4j.properties:
##########
@@ -20,7 +20,8 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
-
+log4j.logger.kafka.zk=INFO
+log4j.logger.org.apache.zookeeper.server=INFO

Review Comment:
   Do we really want to enable this for all tests? Should we instead leave a comment on the new test that suggests enabling these for troubleshooting as needed?



##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;
+
+        SandboxedZookeeper(CountDownLatch zookeeperStopLatch, ZkTestContext spec) {
+            this.zookeeperStopLatch = zookeeperStopLatch;
+            this.spec = spec;
+        }
+
+        public void run() {
+            ServerCnxnFactory cnxnFactory = null;
+
+            try {
+                Path dataDir = Files.createTempDirectory("zk");
+
+                Properties zkProperties = new Properties();
+                zkProperties.put("dataDir", dataDir.toFile().getPath());
+                zkProperties.put("clientPort", String.valueOf(zkPort));
+                zkProperties.put("serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+                QuorumPeerConfig config = new QuorumPeerConfig();
+                config.parseProperties(zkProperties);
+                FileTxnSnapLog txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
+
+                ZooKeeperServer zookeeper = new InstrumentedZooKeeperServer(
+                    null,
+                    txnLog,
+                    config.getTickTime(),
+                    config.getMinSessionTimeout(),
+                    config.getMaxSessionTimeout(),
+                    config.getClientPortListenBacklog(),
+                    null,
+                    config.getInitialConfig(),
+                    spec) {
+
+                    @Override
+                    protected void setupRequestProcessors() {
+                        processor = new InstrumentedRequestProcessor(this, spec);
+                        RequestProcessor syncProcessor = new SyncRequestProcessor(this, processor);
+                        ((SyncRequestProcessor) syncProcessor).start();
+                        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+                        ((PrepRequestProcessor) firstProcessor).start();
+                    }
+                };
+
+                cnxnFactory = ServerCnxnFactory.createFactory();
+                cnxnFactory.configure(
+                    config.getClientPortAddress(),
+                    config.getMaxClientCnxns(),
+                    config.getClientPortListenBacklog(),
+                    false);
+                cnxnFactory.startup(zookeeper);
+
+                zookeeperStartLatch.countDown();
+                zookeeperStopLatch.await();

Review Comment:
   It's a bit odd to keep an extra thread just waiting for a signal to shut down ZK. Could we not have `start()` and `stop()` methods instead?



##########
core/src/test/java/kafka/zk/ReceiptEvent.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import org.apache.zookeeper.server.MutedServerCxn;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Event corresponding to the invocation of the Zookeeper request processor for a request.
+ */
+public class ReceiptEvent {
+    private final int opCode;
+    private final Lock lock = new ReentrantLock();
+    private final Condition received = lock.newCondition();
+    private final Condition processed = lock.newCondition();
+    private State state = State.notReceived;
+    private boolean sendResponse;
+
+    public ReceiptEvent(int opCode, boolean sendResponse) {
+        this.opCode = opCode;
+        this.sendResponse = sendResponse;
+    }
+
+    public boolean matches(Request request) {
+        return request.type == opCode;
+    }
+
+    public void serverReceived() {
+        lock.lock();
+        try {
+            state = Collections.max(Arrays.asList(state, State.received));
+            received.signalAll();

Review Comment:
   This condition doesn't seem to be used anywhere?



##########
core/src/test/java/kafka/zk/ReceiptEvent.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import org.apache.zookeeper.server.MutedServerCxn;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Event corresponding to the invocation of the Zookeeper request processor for a request.
+ */
+public class ReceiptEvent {
+    private final int opCode;
+    private final Lock lock = new ReentrantLock();
+    private final Condition received = lock.newCondition();
+    private final Condition processed = lock.newCondition();

Review Comment:
   Could this maybe be lighter on locking by using with a `AtomicReference<State>.compareAndSet()` on `serverProcessed()` and conditionally completing a `CompletableFuture<Void> processed` for `awaitProcessed()` to `.get()` on?



##########
checkstyle/suppressions.xml:
##########
@@ -41,6 +41,8 @@
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
+    <suppress checks="ImportControl"
+              files="(ZkBrokerRegistrationTest|DelegatingRequest|InstrumentedRequestProcessor|InstrumentedZooKeeperServer|ReceiptEvent|MutedServerCxn).java"/>

Review Comment:
   Why add this exception instead of sorting the import statements as per checkstyle rules?



##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;

Review Comment:
   This doesn't seem to be used, can be local.



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   Should this be a function of the `"zookeeper.session.timeout.ms"` config property?



##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;
+
+        SandboxedZookeeper(CountDownLatch zookeeperStopLatch, ZkTestContext spec) {
+            this.zookeeperStopLatch = zookeeperStopLatch;
+            this.spec = spec;
+        }
+
+        public void run() {
+            ServerCnxnFactory cnxnFactory = null;
+
+            try {
+                Path dataDir = Files.createTempDirectory("zk");
+
+                Properties zkProperties = new Properties();
+                zkProperties.put("dataDir", dataDir.toFile().getPath());
+                zkProperties.put("clientPort", String.valueOf(zkPort));
+                zkProperties.put("serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+                QuorumPeerConfig config = new QuorumPeerConfig();
+                config.parseProperties(zkProperties);
+                FileTxnSnapLog txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
+
+                ZooKeeperServer zookeeper = new InstrumentedZooKeeperServer(
+                    null,
+                    txnLog,
+                    config.getTickTime(),
+                    config.getMinSessionTimeout(),
+                    config.getMaxSessionTimeout(),
+                    config.getClientPortListenBacklog(),
+                    null,
+                    config.getInitialConfig(),
+                    spec) {
+
+                    @Override
+                    protected void setupRequestProcessors() {
+                        processor = new InstrumentedRequestProcessor(this, spec);
+                        RequestProcessor syncProcessor = new SyncRequestProcessor(this, processor);
+                        ((SyncRequestProcessor) syncProcessor).start();
+                        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+                        ((PrepRequestProcessor) firstProcessor).start();
+                    }

Review Comment:
   Can't this be part of `InstrumentedZooKeeperServer`, since it's a new class anyway? 



##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {

Review Comment:
   Since we're finding a free port and starting zookeeper and everything, shouldn't we tag as an integration test using `@Tag("integration")`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1563016533

   Thanks Igor for the review, addressing these comments now - sorry i didn't earlier.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "soarez (via GitHub)" <gi...@apache.org>.
soarez commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205587123


##########
checkstyle/suppressions.xml:
##########
@@ -41,6 +41,8 @@
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
+    <suppress checks="ImportControl"
+              files="(ZkBrokerRegistrationTest|DelegatingRequest|InstrumentedRequestProcessor|InstrumentedZooKeeperServer|ReceiptEvent|ZkBrokerRegistrationStubs).java"/>

Review Comment:
   Is this intentional? Can't we sort the import statements to satisfy this Checkstyle rule?



##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   Should the backoff value be calculated here then? The session timeout is available in `apply()` where `KafkaZkClient` is created. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] soarez commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "soarez (via GitHub)" <gi...@apache.org>.
soarez commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1554360896

   Thanks for explaining! I agree it makes sense for the client to account for this behavior.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] github-actions[bot] commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "github-actions[bot] (via GitHub)" <gi...@apache.org>.
github-actions[bot] commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1696716224

   This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please ask a committer for review. If the PR has  merge conflicts, please update it with the latest from trunk (or appropriate release branch) <p> If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197760360


##########
core/src/test/resources/log4j.properties:
##########
@@ -20,7 +20,8 @@ log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
 
 log4j.logger.kafka=WARN
 log4j.logger.org.apache.kafka=WARN
-
+log4j.logger.kafka.zk=INFO
+log4j.logger.org.apache.zookeeper.server=INFO

Review Comment:
   You are right, we shouldn't change these log levels, that is a rogue change.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1553446542

   Hi, Igor,
   
   Thanks for spending the time reviewing the PR and providing detailed comments. At a high-level, the problem this PR attempts to solve comes from two behaviours observed in Zookeeper:
   
   1. The deletion of an ephemeral node after expiration of the owning session is eventually consistent.
   2. CRUD operations on data via client requests is possible even if a new session has not been "acknowledged" by a client. This means that a session can be created on the server, and znode creation request be processed under that session, while the client has not even received the response to the session creation request.
   
   Because of (1), there is a small time window during which the ephemeral znode of an expired session can still be read.
   Because of (2), there is a possibility for the session owning an ephemeral znode to be unknown by the client which created it.
   
   The test in this PR forces (1) and (2) to happen to reproduce a case we have seen in production. Now, is this a bug in Zookeeper? I would think it isn't. I could not find evidence of strong consistency guarantees for session expiration and the deletion of associated ephemerals, and the Zookeeper server code clearly shows there is no such guarantee. So, Zookeeper is most likely working as designed here. (2) also seems a work-as-designed of the asynchronous Zookeeper client, and Kafka even has a configuration property to set the depth of the request pipeline [[1]](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74684881). Note that I've only tested the implementation of the Zookeeper client which integrates with Netty, not the Java NIO one, and cannot confirm the same behaviour is observed in this test with the latter.
   
   The client integrating with Zookeeper in Kafka should take into account the eventually consistent behaviour of (1), and rely  on retries to allow Zk to converge.
   
   [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=74684881


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "soarez (via GitHub)" <gi...@apache.org>.
soarez commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1198793889


##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;
+
+        SandboxedZookeeper(CountDownLatch zookeeperStopLatch, ZkTestContext spec) {
+            this.zookeeperStopLatch = zookeeperStopLatch;
+            this.spec = spec;
+        }
+
+        public void run() {
+            ServerCnxnFactory cnxnFactory = null;
+
+            try {
+                Path dataDir = Files.createTempDirectory("zk");
+
+                Properties zkProperties = new Properties();
+                zkProperties.put("dataDir", dataDir.toFile().getPath());
+                zkProperties.put("clientPort", String.valueOf(zkPort));
+                zkProperties.put("serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+                QuorumPeerConfig config = new QuorumPeerConfig();
+                config.parseProperties(zkProperties);
+                FileTxnSnapLog txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
+
+                ZooKeeperServer zookeeper = new InstrumentedZooKeeperServer(
+                    null,
+                    txnLog,
+                    config.getTickTime(),
+                    config.getMinSessionTimeout(),
+                    config.getMaxSessionTimeout(),
+                    config.getClientPortListenBacklog(),
+                    null,
+                    config.getInitialConfig(),
+                    spec) {
+
+                    @Override
+                    protected void setupRequestProcessors() {
+                        processor = new InstrumentedRequestProcessor(this, spec);
+                        RequestProcessor syncProcessor = new SyncRequestProcessor(this, processor);
+                        ((SyncRequestProcessor) syncProcessor).start();
+                        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+                        ((PrepRequestProcessor) firstProcessor).start();
+                    }
+                };
+
+                cnxnFactory = ServerCnxnFactory.createFactory();
+                cnxnFactory.configure(
+                    config.getClientPortAddress(),
+                    config.getMaxClientCnxns(),
+                    config.getClientPortListenBacklog(),
+                    false);
+                cnxnFactory.startup(zookeeper);
+
+                zookeeperStartLatch.countDown();
+                zookeeperStopLatch.await();

Review Comment:
   Indeed, I didn't mean to suggest `Thread#stop()`. 
   
   Currently my understanding is that `SandoxedZookeeper` is a thread that when run, starts ZK (asynchronously) and then that same thread just actively waits for a signal before it shutdowns ZK.
   
   What I'm proposing is that `SandoxedZookeeper` not be a `Thread`, and provide a `startZk()` (or just `start()` method that does what is currently being done in `Thread#run()` to bring up ZK, and also provide a `stop()` method to do what is currently being done after the signal is received. i.e. instead of propagating a signal, we could just invoke `stop()`. 
   
   I'm hoping this way we can avoid having an extra Thread, just actively waiting to shutdown ZK. Would this make sense? Or am I missing something maybe?  



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205669500


##########
checkstyle/suppressions.xml:
##########
@@ -41,6 +41,8 @@
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
+    <suppress checks="ImportControl"
+              files="(ZkBrokerRegistrationTest|DelegatingRequest|InstrumentedRequestProcessor|InstrumentedZooKeeperServer|ReceiptEvent|ZkBrokerRegistrationStubs).java"/>

Review Comment:
   Added the packages to import-control-core in [this commit](https://github.com/apache/kafka/pull/13558/commits/03d444b94de2494548b61cd0458c842c05872e3f). I had to kept a suppression rule for the package `org.apache.zookeeper` because it is not part of any import control and Checkstyle would not accept it without a proper import control definition.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] soarez commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "soarez (via GitHub)" <gi...@apache.org>.
soarez commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1198804390


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   I wonder if it would make sense to have the interval between retries be defined as: `(zkSessionTimeout + margin) / maxAttempts`? Where `margin` could be a small fixed delta, e.g. one second, and `zkSessionTimeout` is taken from the Kafka configuration for `"zookeeper.session.timeout.ms"`. WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1557081175

   Hi Igor, thanks for the follow-up. Just added three commits which:
   
   1) Replace `InstrumentedRequestProcessor` and `MutedServerCnxn` with spies stubbing only the necessary methods. 
   2) Remove `DelegatingRequest` as it is possible to re-use the existing `Request` class.
   3) Remove the unnecessary thread starting Zookeeper.
   
   Note that `InstrumentedZooKeeperServer` is a bit trickier to spy because the state of the original instance and the stubbed one are not in sync, which makes delegation to the original instance's method `expire` and `processConnectRequest` not feasible unless said state is preserved between the two instances. Because only the few required method are overridden for this class, unlike what had been for e.g. `DelegatingRequest`, there is less boilerplate code with this particular case.
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197924320


##########
core/src/test/java/kafka/zk/ReceiptEvent.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import org.apache.zookeeper.server.MutedServerCxn;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Event corresponding to the invocation of the Zookeeper request processor for a request.
+ */
+public class ReceiptEvent {
+    private final int opCode;
+    private final Lock lock = new ReentrantLock();
+    private final Condition received = lock.newCondition();
+    private final Condition processed = lock.newCondition();

Review Comment:
   Sure, I replaced the `Condition` with a completable future and removed the `State` enum altogether as it was unnecessary (since no client code is observing the received event).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197760645


##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;
+
+        SandboxedZookeeper(CountDownLatch zookeeperStopLatch, ZkTestContext spec) {
+            this.zookeeperStopLatch = zookeeperStopLatch;
+            this.spec = spec;
+        }
+
+        public void run() {
+            ServerCnxnFactory cnxnFactory = null;
+
+            try {
+                Path dataDir = Files.createTempDirectory("zk");
+
+                Properties zkProperties = new Properties();
+                zkProperties.put("dataDir", dataDir.toFile().getPath());
+                zkProperties.put("clientPort", String.valueOf(zkPort));
+                zkProperties.put("serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+                QuorumPeerConfig config = new QuorumPeerConfig();
+                config.parseProperties(zkProperties);
+                FileTxnSnapLog txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
+
+                ZooKeeperServer zookeeper = new InstrumentedZooKeeperServer(
+                    null,
+                    txnLog,
+                    config.getTickTime(),
+                    config.getMinSessionTimeout(),
+                    config.getMaxSessionTimeout(),
+                    config.getClientPortListenBacklog(),
+                    null,
+                    config.getInitialConfig(),
+                    spec) {
+
+                    @Override
+                    protected void setupRequestProcessors() {
+                        processor = new InstrumentedRequestProcessor(this, spec);
+                        RequestProcessor syncProcessor = new SyncRequestProcessor(this, processor);
+                        ((SyncRequestProcessor) syncProcessor).start();
+                        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+                        ((PrepRequestProcessor) firstProcessor).start();
+                    }
+                };
+
+                cnxnFactory = ServerCnxnFactory.createFactory();
+                cnxnFactory.configure(
+                    config.getClientPortAddress(),
+                    config.getMaxClientCnxns(),
+                    config.getClientPortListenBacklog(),
+                    false);
+                cnxnFactory.startup(zookeeper);
+
+                zookeeperStartLatch.countDown();
+                zookeeperStopLatch.await();

Review Comment:
   Hmm, at least in theory, the use of `Thread#stop()` is [discouraged](https://docs.oracle.com/en/java/javase/17/docs/api/java.base/java/lang/doc-files/threadPrimitiveDeprecation.html). Its Javadoc indicates:
   
   > *Deprecated: This method is inherently unsafe. 
   > [...]
   > Many uses of stop should be replaced by code that simply modifies some variable to indicate that the target thread should stop running. The target thread should check this variable regularly, and return from its run method in an orderly fashion if the variable indicates that it is to stop running. If the target thread waits for long periods (on a condition variable, for example), the interrupt method should be used to interrupt the wait.*
   
   I replaced the count-down latch with a flag-and-interrupt approach and a closeable pattern borrowed from e.g. the Kafka client. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on PR #13558:
URL: https://github.com/apache/kafka/pull/13558#issuecomment-1553450467

   (note: I will update the PR tomorrow to use Mockito stubs instead of subclasses).


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1205669500


##########
checkstyle/suppressions.xml:
##########
@@ -41,6 +41,8 @@
     <suppress checks="NPathComplexity" files="(ClusterTestExtensions|KafkaApisBuilder).java"/>
     <suppress checks="MethodLength"
               files="(KafkaClusterTestKit).java"/>
+    <suppress checks="ImportControl"
+              files="(ZkBrokerRegistrationTest|DelegatingRequest|InstrumentedRequestProcessor|InstrumentedZooKeeperServer|ReceiptEvent|ZkBrokerRegistrationStubs).java"/>

Review Comment:
   Added the packages to import-control-core in [this commit](https://github.com/apache/kafka/pull/13558/commits/03d444b94de2494548b61cd0458c842c05872e3f). I had to kept a suppression rule for the package `org.apache.zookeeper` because it is not part of any import control and Checkstyle would not accept it without it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197924320


##########
core/src/test/java/kafka/zk/ReceiptEvent.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import org.apache.zookeeper.server.MutedServerCxn;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Event corresponding to the invocation of the Zookeeper request processor for a request.
+ */
+public class ReceiptEvent {
+    private final int opCode;
+    private final Lock lock = new ReentrantLock();
+    private final Condition received = lock.newCondition();
+    private final Condition processed = lock.newCondition();

Review Comment:
   Sure, I replaced the `Condition` with a completable future and removed the `State` enum altogether as it was unnecessary (since no client code is observing the "received" type of event).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1198005710


##########
core/src/main/scala/kafka/zk/KafkaZkClient.scala:
##########
@@ -2113,7 +2113,12 @@ class KafkaZkClient private[zk] (zooKeeperClient: ZooKeeperClient, isSecure: Boo
   }
 
   private class CheckedEphemeral(path: String, data: Array[Byte]) extends Logging {
+    private var attempt = 0
+    private val maxAttempt = 5
+    private val backoffMs = 1000

Review Comment:
   This is indeed an important element to consider when configuring the retry strategy. We probably don't want to retry past the session timeout because that exposes to the possibility that the session used during the first call is not valid anymore.
   
   And, the session timeout can be used as well to calibrate the response times to be expected, since the Zk client in Kafka does not make use of request timeouts. Another timeout to consider could be the read timeout which is the maximum time allowed without activity on a connection before that connection is closed. The Zookeeper protocol includes heartbeats (pings) performed at least once every half of the read timeout. The read timeout is set to be the session timeout multiplied by 2/3 divided by the number of hosts registered in the Zookeeper cluster.
   
   I haven't looked at Apache Curator to see what approach is used for retries there, but that could be an interesting entry point I suppose.
   
   So the retry strategy here requires a bit more thoughts.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197911485


##########
core/src/test/java/kafka/zk/ReceiptEvent.java:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import org.apache.zookeeper.server.MutedServerCxn;
+import org.apache.zookeeper.server.Request;
+import org.apache.zookeeper.server.ServerCnxn;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+/**
+ * Event corresponding to the invocation of the Zookeeper request processor for a request.
+ */
+public class ReceiptEvent {
+    private final int opCode;
+    private final Lock lock = new ReentrantLock();
+    private final Condition received = lock.newCondition();
+    private final Condition processed = lock.newCondition();
+    private State state = State.notReceived;
+    private boolean sendResponse;
+
+    public ReceiptEvent(int opCode, boolean sendResponse) {
+        this.opCode = opCode;
+        this.sendResponse = sendResponse;
+    }
+
+    public boolean matches(Request request) {
+        return request.type == opCode;
+    }
+
+    public void serverReceived() {
+        lock.lock();
+        try {
+            state = Collections.max(Arrays.asList(state, State.received));
+            received.signalAll();

Review Comment:
   Yes, that is right - removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197761299


##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;
+
+        SandboxedZookeeper(CountDownLatch zookeeperStopLatch, ZkTestContext spec) {
+            this.zookeeperStopLatch = zookeeperStopLatch;
+            this.spec = spec;
+        }
+
+        public void run() {
+            ServerCnxnFactory cnxnFactory = null;
+
+            try {
+                Path dataDir = Files.createTempDirectory("zk");
+
+                Properties zkProperties = new Properties();
+                zkProperties.put("dataDir", dataDir.toFile().getPath());
+                zkProperties.put("clientPort", String.valueOf(zkPort));
+                zkProperties.put("serverCnxnFactory", "org.apache.zookeeper.server.NettyServerCnxnFactory");
+
+                QuorumPeerConfig config = new QuorumPeerConfig();
+                config.parseProperties(zkProperties);
+                FileTxnSnapLog txnLog = new FileTxnSnapLog(config.getDataLogDir(), config.getDataDir());
+
+                ZooKeeperServer zookeeper = new InstrumentedZooKeeperServer(
+                    null,
+                    txnLog,
+                    config.getTickTime(),
+                    config.getMinSessionTimeout(),
+                    config.getMaxSessionTimeout(),
+                    config.getClientPortListenBacklog(),
+                    null,
+                    config.getInitialConfig(),
+                    spec) {
+
+                    @Override
+                    protected void setupRequestProcessors() {
+                        processor = new InstrumentedRequestProcessor(this, spec);
+                        RequestProcessor syncProcessor = new SyncRequestProcessor(this, processor);
+                        ((SyncRequestProcessor) syncProcessor).start();
+                        firstProcessor = new PrepRequestProcessor(this, syncProcessor);
+                        ((PrepRequestProcessor) firstProcessor).start();
+                    }

Review Comment:
   Sure, it should be overridden in `InstrumentedZooKeeperServer` indeed. Thanks for pointing it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197761016


##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {
+    private static final Logger log = LoggerFactory.getLogger(ZkBrokerRegistrationTest.class);
+
+    private KafkaConfig kafkaConfig;
+    private BrokerInfo brokerInfo;
+    private int zkPort;
+
+    private class SandboxedZookeeper extends Thread {
+        private final CountDownLatch zookeeperStopLatch;
+        private final CountDownLatch zookeeperStartLatch = new CountDownLatch(1);
+        private final ZkTestContext spec;
+        private InstrumentedRequestProcessor processor;

Review Comment:
   Yes indeed, thanks for pointing it out.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [kafka] Hangleton commented on a diff in pull request #13558: KAFKA-14845: Fix broker registration with Zookeeper when the previous ephemeral znode was not properly recorded by the broker

Posted by "Hangleton (via GitHub)" <gi...@apache.org>.
Hangleton commented on code in PR #13558:
URL: https://github.com/apache/kafka/pull/13558#discussion_r1197756841


##########
core/src/test/java/kafka/zk/ZkBrokerRegistrationTest.java:
##########
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.zk;
+
+import io.netty.channel.group.ChannelGroup;
+import io.netty.util.concurrent.EventExecutor;
+import kafka.cluster.Broker;
+import kafka.server.KafkaConfig;
+import kafka.zookeeper.ZooKeeperClient;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.Time;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.client.ZKClientConfig;
+import org.apache.zookeeper.server.PrepRequestProcessor;
+import org.apache.zookeeper.server.RequestProcessor;
+import org.apache.zookeeper.server.ServerCnxnFactory;
+import org.apache.zookeeper.server.SyncRequestProcessor;
+import org.apache.zookeeper.server.ZooKeeperServer;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.net.ServerSocket;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.CountDownLatch;
+
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.security.auth.SecurityProtocol.PLAINTEXT;
+import static org.apache.zookeeper.client.ZKClientConfig.ZOOKEEPER_CLIENT_CNXN_SOCKET;
+
+/**
+ * This test simulates the loss of a session ID with Zookeeper (for instance due to network partition)
+ * while a broker is in the process of creating its ephemeral znode under /brokers/ids/. This can
+ * result in broker registration failing with due to a NODEEXISTS error which is not handled by the
+ * fix KAFKA-6584 because in this case, the ID of the session of the conflicting ephemeral znode is
+ * not known by the broker. See KAFKA-14845 for an example of timeline of events requires to reproduce
+ * the use case.
+ */
+public class ZkBrokerRegistrationTest {

Review Comment:
   Sure, makes sense.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org