You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/03/03 03:57:37 UTC

[GitHub] [flink] leonardBang commented on a change in pull request #18965: [FLINK-26387][connector/kafka] Use individual multi-broker cluster in broker failure tests

leonardBang commented on a change in pull request #18965:
URL: https://github.com/apache/flink/pull/18965#discussion_r818288213



##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -1463,23 +1463,30 @@ public void cancel() {
     public void runBrokerFailureTest() throws Exception {
         final String topic = "brokerFailureTestTopic";
 
+        // Start a temporary multi-broker cluster.
+        // This test case relies on stopping a broker and switching partition leader to another
+        // during the test, so single-broker cluster (kafkaServer) could not fulfill the
+        // requirement.
+        KafkaTestEnvironment multiBrokerCluster = constructKafkaTestEnvironment();

Review comment:
       ```suggestion
           final KafkaTestEnvironment kafkaServerCluster = constructKafkaTestEnvironment();
   ```

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -2610,7 +2620,9 @@ public T map(T value) throws Exception {
 
                 if (failer && numElementsTotal >= failCount) {
                     // shut down a Kafka broker
-                    kafkaServer.stopBroker(shutdownBrokerId);
+                    kafkaServerToKill.stopBroker(shutdownBrokerId);
+                    hasBeenCheckpointedBeforeFailure = hasBeenCheckpointed;
+                    killedLeaderBefore = true;

Review comment:
       This should be a bug in your last PR and now you correct this IIUC, aha?

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -1463,23 +1463,30 @@ public void cancel() {
     public void runBrokerFailureTest() throws Exception {
         final String topic = "brokerFailureTestTopic";
 
+        // Start a temporary multi-broker cluster.
+        // This test case relies on stopping a broker and switching partition leader to another
+        // during the test, so single-broker cluster (kafkaServer) could not fulfill the
+        // requirement.
+        KafkaTestEnvironment multiBrokerCluster = constructKafkaTestEnvironment();

Review comment:
       minor: we can move this piece of code to line `1478` for readable consideration

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
##########
@@ -496,17 +494,4 @@ private void unpause(int brokerId) throws Exception {
         pausedBroker.remove(brokerId);
         LOG.info("Broker {} is resumed", brokerId);
     }
-
-    private KafkaConsumer<Void, Void> createTempConsumer() {
-        Properties consumerProps = new Properties();
-        consumerProps.putAll(getStandardProperties());
-        consumerProps.setProperty(
-                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
-                VoidDeserializer.class.getCanonicalName());
-        consumerProps.setProperty(
-                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
-                VoidDeserializer.class.getCanonicalName());
-        consumerProps.setProperty(ConsumerConfig.ALLOW_AUTO_CREATE_TOPICS_CONFIG, "false");
-        return new KafkaConsumer<>(consumerProps);
-    }

Review comment:
       plz check the IDE warning before open a PR thus we can avoid this kind of minor issue.

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -2584,14 +2591,17 @@ private boolean validateSequence(
         public static volatile boolean killedLeaderBefore;
         public static volatile boolean hasBeenCheckpointedBeforeFailure;
 
+        private static KafkaTestEnvironment kafkaServerToKill;

Review comment:
       I tend to use `kafkaServerToShutdown` name because we call command `shutdown` instead of `kill ` command

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java
##########
@@ -1495,21 +1502,21 @@ public void runBrokerFailureTest() throws Exception {
         env.setRestartStrategy(RestartStrategies.noRestart());
 
         Properties props = new Properties();
-        props.putAll(standardProps);
-        props.putAll(secureProps);
+        props.putAll(multiBrokerCluster.getStandardProperties());
+        props.putAll(multiBrokerCluster.getSecureProperties());
 
         getStream(env, topic, schema, props)
                 .map(new PartitionValidatingMapper(parallelism, 1))
-                .map(new BrokerKillingMapper<Integer>(leaderId, failAfterElements))
+                .map(new BrokerKillingMapper<>(multiBrokerCluster, leaderId, failAfterElements))
                 .addSink(new ValidatingExactlyOnceSink(totalElements))
                 .setParallelism(1);
 
         try {
             BrokerKillingMapper.killedLeaderBefore = false;
             tryExecute(env, "Broker failure once test");
         } finally {
-            // start a new broker:
-            kafkaServer.restartBroker(leaderId);
+            // Tear down the temporary cluster anyway

Review comment:
       ```suggestion
               // Shutdown the kafkaServer cluster directly
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org