You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/18 09:57:16 UTC

(camel) branch camel-4.4.x updated: CAMEL-20563: shutdown existing consumer instance to release resources… (#13510)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/camel-4.4.x by this push:
     new b8b69492f23 CAMEL-20563: shutdown existing consumer instance to release resources… (#13510)
b8b69492f23 is described below

commit b8b69492f23e2dc54e9c97d70b9a360ec6b153f0
Author: Sami Peltola <sa...@gmail.com>
AuthorDate: Mon Mar 18 11:29:53 2024 +0200

    CAMEL-20563: shutdown existing consumer instance to release resources… (#13510)
    
    * CAMEL-20563: shutdown existing consumer instance to release resources (heartbeat) before creating new ones
    
    * CAMEL-20563: Reformatted code
---
 .../camel/component/kafka/KafkaFetchRecords.java   |   5 +
 .../KafkaBreakOnFirstErrorReleaseResourcesIT.java  | 205 +++++++++++++++++++++
 2 files changed, 210 insertions(+)

diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 8de8517ea1e..125a9c4c184 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -115,6 +115,11 @@ public class KafkaFetchRecords implements Runnable {
 
             if (!isConnected()) {
 
+                // shutdown existing consumer instance to release resources (heartbeat)
+                if (this.consumer != null) {
+                    safeConsumerClose();
+                }
+
                 // task that deals with creating kafka consumer
                 currentBackoffInterval = kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffInterval();
                 ForegroundTask task = Tasks.foregroundTask()
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReleaseResourcesIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReleaseResourcesIT.java
new file mode 100644
index 00000000000..81d8f603044
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReleaseResourcesIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.integration.common.KafkaAdminUtil;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.condition.EnabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test breakOnFirstError functionality and the issue reported in CAMEL-20563 regarding leaking resources, mainly
+ * heartbeat-threads, while reconnecting.
+ *
+ */
+@Tags({ @Tag("breakOnFirstError") })
+@EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
+             architectures = { "amd64", "aarch64", "s390x" },
+             disabledReason = "This test does not run reliably on ppc64le")
+class KafkaBreakOnFirstErrorReleaseResourcesIT extends BaseKafkaTestSupport {
+
+    public static final String ROUTE_ID = "breakOnFirstError-20563";
+    public static final String TOPIC = "breakOnFirstError-20563";
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorReleaseResourcesIT.class);
+    private static final int CONSUMER_COUNT = 3;
+
+    @EndpointInject("mock:result")
+    private MockEndpoint to;
+
+    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+    @BeforeAll
+    public static void setupTopic() {
+        if (kafkaAdminClient == null) {
+            kafkaAdminClient = KafkaAdminUtil.createAdminClient(service);
+        }
+
+        // create the topic w/ 3 partitions
+        final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1);
+        kafkaAdminClient.createTopics(Collections.singleton(mytopic));
+    }
+
+    @BeforeEach
+    public void init() {
+
+        // setup the producer
+        Properties props = getDefaultProperties();
+        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+        MockConsumerInterceptor.recordsCaptured.clear();
+    }
+
+    @AfterEach
+    public void after() {
+        if (producer != null) {
+            producer.close();
+        }
+        // clean all test topics
+        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+    }
+
+    @Test
+    void testCamel20563TestFix() throws Exception {
+        to.reset();
+        to.expectedMessageCount(13);
+        to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR",
+                "6", "7", "ERROR", "8", "9", "10", "11");
+
+        contextExtension.getContext().getRouteController().stopRoute(ROUTE_ID);
+
+        this.publishMessagesToKafka();
+
+        contextExtension.getContext().getRouteController().startRoute(ROUTE_ID);
+
+        // let test run for awhile
+        Awaitility.await()
+                .timeout(10, TimeUnit.SECONDS)
+                .pollDelay(8, TimeUnit.SECONDS)
+                .untilAsserted(() -> assertTrue(true));
+
+        to.assertIsSatisfied();
+
+        int heartbeatThreadCount = countHeartbeatThreads();
+        assertEquals(CONSUMER_COUNT, heartbeatThreadCount, "Heartbeat-thread count should match consumer count");
+        LOG.info("Number of heartbeat-threads is: {}", heartbeatThreadCount);
+
+    }
+
+    protected int countHeartbeatThreads() throws ClassNotFoundException {
+        Set<Thread> threads = Thread.getAllStackTraces().keySet();
+        int count = 0;
+
+        for (Thread t : threads) {
+            if (t.getName().contains("heartbeat")) {
+                count++;
+            }
+        }
+        return count;
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() {
+                onException(RuntimeException.class)
+                        .handled(false)
+                        .process(exchange -> {
+                            doCommitOffset(exchange);
+                        })
+                        .end();
+
+                from("kafka:" + TOPIC
+                     + "?groupId=" + ROUTE_ID
+                     + "&autoOffsetReset=earliest"
+                     + "&autoCommitEnable=false"
+                     + "&allowManualCommit=true"
+                     + "&breakOnFirstError=true"
+                     + "&maxPollRecords=1"
+                     + "&consumersCount=" + CONSUMER_COUNT
+                     + "&pollTimeoutMs=1000"
+                     + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                     + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+                     + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+                        .routeId(ROUTE_ID)
+                        .autoStartup(false)
+                        .process(exchange -> {
+                            LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+                        })
+                        // capturing all of the payloads
+                        .to(to)
+                        .process(exchange -> {
+                            ifIsPayloadWithErrorThrowException(exchange);
+                        })
+                        .process(exchange -> {
+                            doCommitOffset(exchange);
+                        })
+                        .end();
+            }
+        };
+    }
+
+    private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+        String payload = exchange.getMessage().getBody(String.class);
+        if (payload.equals("ERROR")) {
+            throw new RuntimeException("NON RETRY ERROR TRIGGERED BY TEST");
+        }
+    }
+
+    private void publishMessagesToKafka() {
+        final List<String> producedRecords = List.of("1", "2", "3", "4", "5", "ERROR",
+                "6", "7", "ERROR", "8", "9", "10", "11");
+
+        producedRecords.forEach(v -> {
+            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, v);
+            producer.send(data);
+        });
+
+    }
+
+    private void doCommitOffset(Exchange exchange) {
+        LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, true));
+        KafkaManualCommit manual = exchange.getMessage()
+                .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+        if (Objects.nonNull(manual)) {
+            manual.commit();
+        } else {
+            LOG.error("KafkaManualCommit is MISSING");
+        }
+    }
+
+}