You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2021/10/25 07:53:42 UTC

[GitHub] [flink] alpreu commented on a change in pull request #17539: [FLINK-24612][connectors/kafka] Configure Kafka test container log le…

alpreu commented on a change in pull request #17539:
URL: https://github.com/apache/flink/pull/17539#discussion_r735335020



##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaTransactionLogITCase.java
##########
@@ -60,15 +60,10 @@
 
     @ClassRule
     public static final KafkaContainer KAFKA_CONTAINER =
-            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"))
-                    .withEmbeddedZookeeper()
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withEnv(
-                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                            String.valueOf(Duration.ofHours(2).toMillis()))
-                    .withLogConsumer(LOG_CONSUMER);
+            configureKafkaContainer(
+                    LOG,
+                    new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:6.2.0"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here? 
   Why does this test use a different version of Kafka?

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/FlinkKafkaInternalProducerITCase.java
##########
@@ -55,12 +55,10 @@
 
     @Container
     private static final KafkaContainer KAFKA_CONTAINER =
-            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withLogConsumer(new Slf4jLogConsumer(LOG))
-                    .withEmbeddedZookeeper();
+            configureKafkaContainer(
+                    LOG,
+                    new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here?

##########
File path: flink-connectors/flink-connector-kafka/src/test/resources/log4j2-test.properties
##########
@@ -18,7 +18,7 @@
 
 # Set root logger level to OFF to not flood build logs
 # set manually to INFO for debugging purposes
-rootLogger.level = OFF
+rootLogger.level = INFO

Review comment:
       Is this intentional?

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
##########
@@ -91,17 +91,12 @@
     private TriggerTimeService timeService;
 
     private static final KafkaContainer KAFKA_CONTAINER =
-            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
-                    .withEmbeddedZookeeper()
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withEnv(
-                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                            String.valueOf(Duration.ofHours(2).toMillis()))
-                    .withNetwork(NETWORK)
-                    .withLogConsumer(LOG_CONSUMER)
-                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+            configureKafkaContainer(
+                    LOG,
+                    new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here?

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
##########
@@ -124,17 +124,12 @@
 
     @ClassRule
     public static final KafkaContainer KAFKA_CONTAINER =
-            new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))
-                    .withEmbeddedZookeeper()
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR", "1")
-                    .withEnv("KAFKA_TRANSACTION_STATE_LOG_MIN_ISR", "1")
-                    .withEnv("KAFKA_CONFLUENT_SUPPORT_METRICS_ENABLE", "false")
-                    .withEnv(
-                            "KAFKA_TRANSACTION_MAX_TIMEOUT_MS",
-                            String.valueOf(Duration.ofHours(2).toMillis()))
-                    .withNetwork(NETWORK)
-                    .withLogConsumer(LOG_CONSUMER)
-                    .withNetworkAliases(INTER_CONTAINER_KAFKA_ALIAS);
+            configureKafkaContainer(
+                    LOG,
+                    new KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:5.5.2"))

Review comment:
       Use String constant from org.apache.flink.util.DockerImageVersions here?

##########
File path: flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaUtil.java
##########
@@ -43,6 +45,46 @@
 
     private KafkaUtil() {}
 
+    /**
+     * This method helps to set commonly used Kafka configurations and aligns the internal Kafka log
+     * levels with the ones used by the capturing logger.
+     *
+     * @param logger to derive the log level from
+     * @param container running Kafka
+     * @return configured Kafka container
+     */
+    public static KafkaContainer configureKafkaContainer(Logger logger, KafkaContainer container) {
+        String logLevel;
+        if (logger.isErrorEnabled()) {
+            logLevel = "ERROR";
+        } else if (logger.isTraceEnabled()) {
+            logLevel = "TRACE";
+        } else if (logger.isDebugEnabled()) {
+            logLevel = "DEBUG";
+        } else if (logger.isWarnEnabled()) {
+            logLevel = "WARN";
+        } else if (logger.isInfoEnabled()) {
+            logLevel = "INFO";
+        } else {
+            throw new IllegalStateException("Unsupported log level configured.");

Review comment:
       Doesn't this fail if logging is set to OFF?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org