You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/18 09:57:16 UTC
(camel) branch camel-4.4.x updated: CAMEL-20563: shutdown existing consumer instance to release resources… (#13510)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.4.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.4.x by this push:
new b8b69492f23 CAMEL-20563: shutdown existing consumer instance to release resources… (#13510)
b8b69492f23 is described below
commit b8b69492f23e2dc54e9c97d70b9a360ec6b153f0
Author: Sami Peltola <sa...@gmail.com>
AuthorDate: Mon Mar 18 11:29:53 2024 +0200
CAMEL-20563: shutdown existing consumer instance to release resources… (#13510)
* CAMEL-20563: shutdown existing consumer instance to release resources (heartbeat) before creating new ones
* CAMEL-20563: Reformatted code
---
.../camel/component/kafka/KafkaFetchRecords.java | 5 +
.../KafkaBreakOnFirstErrorReleaseResourcesIT.java | 205 +++++++++++++++++++++
2 files changed, 210 insertions(+)
diff --git a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
index 8de8517ea1e..125a9c4c184 100644
--- a/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
+++ b/components/camel-kafka/src/main/java/org/apache/camel/component/kafka/KafkaFetchRecords.java
@@ -115,6 +115,11 @@ public class KafkaFetchRecords implements Runnable {
if (!isConnected()) {
+ // shutdown existing consumer instance to release resources (heartbeat)
+ if (this.consumer != null) {
+ safeConsumerClose();
+ }
+
// task that deals with creating kafka consumer
currentBackoffInterval = kafkaConsumer.getEndpoint().getComponent().getCreateConsumerBackoffInterval();
ForegroundTask task = Tasks.foregroundTask()
diff --git a/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReleaseResourcesIT.java b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReleaseResourcesIT.java
new file mode 100644
index 00000000000..81d8f603044
--- /dev/null
+++ b/components/camel-kafka/src/test/java/org/apache/camel/component/kafka/integration/KafkaBreakOnFirstErrorReleaseResourcesIT.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.kafka.integration;
+
+import java.util.*;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.EndpointInject;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.kafka.KafkaConstants;
+import org.apache.camel.component.kafka.MockConsumerInterceptor;
+import org.apache.camel.component.kafka.consumer.KafkaManualCommit;
+import org.apache.camel.component.kafka.integration.common.KafkaAdminUtil;
+import org.apache.camel.component.kafka.testutil.CamelKafkaUtil;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.awaitility.Awaitility;
+import org.junit.jupiter.api.*;
+import org.junit.jupiter.api.condition.EnabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Test breakOnFirstError functionality and the issue reported in CAMEL-20563 regarding leaking resources, mainly
+ * heartbeat-threads, while reconnecting.
+ *
+ */
+@Tags({ @Tag("breakOnFirstError") })
+@EnabledOnOs(value = { OS.LINUX, OS.MAC, OS.FREEBSD, OS.OPENBSD, OS.WINDOWS },
+ architectures = { "amd64", "aarch64", "s390x" },
+ disabledReason = "This test does not run reliably on ppc64le")
+class KafkaBreakOnFirstErrorReleaseResourcesIT extends BaseKafkaTestSupport {
+
+ public static final String ROUTE_ID = "breakOnFirstError-20563";
+ public static final String TOPIC = "breakOnFirstError-20563";
+
+ private static final Logger LOG = LoggerFactory.getLogger(KafkaBreakOnFirstErrorReleaseResourcesIT.class);
+ private static final int CONSUMER_COUNT = 3;
+
+ @EndpointInject("mock:result")
+ private MockEndpoint to;
+
+ private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
+
+ @BeforeAll
+ public static void setupTopic() {
+ if (kafkaAdminClient == null) {
+ kafkaAdminClient = KafkaAdminUtil.createAdminClient(service);
+ }
+
+ // create the topic w/ 3 partitions
+ final NewTopic mytopic = new NewTopic(TOPIC, 3, (short) 1);
+ kafkaAdminClient.createTopics(Collections.singleton(mytopic));
+ }
+
+ @BeforeEach
+ public void init() {
+
+ // setup the producer
+ Properties props = getDefaultProperties();
+ producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
+ MockConsumerInterceptor.recordsCaptured.clear();
+ }
+
+ @AfterEach
+ public void after() {
+ if (producer != null) {
+ producer.close();
+ }
+ // clean all test topics
+ kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
+ }
+
+ @Test
+ void testCamel20563TestFix() throws Exception {
+ to.reset();
+ to.expectedMessageCount(13);
+ to.expectedBodiesReceivedInAnyOrder("1", "2", "3", "4", "5", "ERROR",
+ "6", "7", "ERROR", "8", "9", "10", "11");
+
+ contextExtension.getContext().getRouteController().stopRoute(ROUTE_ID);
+
+ this.publishMessagesToKafka();
+
+ contextExtension.getContext().getRouteController().startRoute(ROUTE_ID);
+
+ // let test run for awhile
+ Awaitility.await()
+ .timeout(10, TimeUnit.SECONDS)
+ .pollDelay(8, TimeUnit.SECONDS)
+ .untilAsserted(() -> assertTrue(true));
+
+ to.assertIsSatisfied();
+
+ int heartbeatThreadCount = countHeartbeatThreads();
+ assertEquals(CONSUMER_COUNT, heartbeatThreadCount, "Heartbeat-thread count should match consumer count");
+ LOG.info("Number of heartbeat-threads is: {}", heartbeatThreadCount);
+
+ }
+
+ protected int countHeartbeatThreads() throws ClassNotFoundException {
+ Set<Thread> threads = Thread.getAllStackTraces().keySet();
+ int count = 0;
+
+ for (Thread t : threads) {
+ if (t.getName().contains("heartbeat")) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() {
+ onException(RuntimeException.class)
+ .handled(false)
+ .process(exchange -> {
+ doCommitOffset(exchange);
+ })
+ .end();
+
+ from("kafka:" + TOPIC
+ + "?groupId=" + ROUTE_ID
+ + "&autoOffsetReset=earliest"
+ + "&autoCommitEnable=false"
+ + "&allowManualCommit=true"
+ + "&breakOnFirstError=true"
+ + "&maxPollRecords=1"
+ + "&consumersCount=" + CONSUMER_COUNT
+ + "&pollTimeoutMs=1000"
+ + "&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ + "&valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
+ + "&interceptorClasses=org.apache.camel.component.kafka.MockConsumerInterceptor")
+ .routeId(ROUTE_ID)
+ .autoStartup(false)
+ .process(exchange -> {
+ LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Consuming", exchange, true));
+ })
+ // capturing all of the payloads
+ .to(to)
+ .process(exchange -> {
+ ifIsPayloadWithErrorThrowException(exchange);
+ })
+ .process(exchange -> {
+ doCommitOffset(exchange);
+ })
+ .end();
+ }
+ };
+ }
+
+ private void ifIsPayloadWithErrorThrowException(Exchange exchange) {
+ String payload = exchange.getMessage().getBody(String.class);
+ if (payload.equals("ERROR")) {
+ throw new RuntimeException("NON RETRY ERROR TRIGGERED BY TEST");
+ }
+ }
+
+ private void publishMessagesToKafka() {
+ final List<String> producedRecords = List.of("1", "2", "3", "4", "5", "ERROR",
+ "6", "7", "ERROR", "8", "9", "10", "11");
+
+ producedRecords.forEach(v -> {
+ ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, null, v);
+ producer.send(data);
+ });
+
+ }
+
+ private void doCommitOffset(Exchange exchange) {
+ LOG.debug(CamelKafkaUtil.buildKafkaLogMessage("Committing", exchange, true));
+ KafkaManualCommit manual = exchange.getMessage()
+ .getHeader(KafkaConstants.MANUAL_COMMIT, KafkaManualCommit.class);
+ if (Objects.nonNull(manual)) {
+ manual.commit();
+ } else {
+ LOG.error("KafkaManualCommit is MISSING");
+ }
+ }
+
+}