You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/21 13:34:01 UTC
(camel-spring-boot) 03/04: Remove outdated test. We test in core camel
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
commit 1ccdc4cd889d77abba227729fe7945d5a6889282
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Mar 21 14:04:31 2024 +0100
Remove outdated test. We test in core camel
---
.../integration/KafkaConsumerHealthCheckIT.java | 209 ---------------------
1 file changed, 209 deletions(-)
diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
deleted file mode 100644
index 5db919b98a3..00000000000
--- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.integration;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.StreamSupport;
-import org.apache.camel.CamelContext;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.health.HealthCheck;
-import org.apache.camel.health.HealthCheckHelper;
-import org.apache.camel.health.HealthCheckRegistry;
-import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
-import org.apache.camel.spring.boot.CamelAutoConfiguration;
-import org.apache.camel.spring.boot.CamelContextConfiguration;
-import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.MethodOrderer;
-import org.junit.jupiter.api.Order;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestMethodOrder;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.annotation.DirtiesContext;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-@DirtiesContext
-@CamelSpringBootTest
-@SpringBootTest(classes = { CamelAutoConfiguration.class, BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
- KafkaConsumerHealthCheckIT.class, KafkaConsumerHealthCheckIT.TestConfiguration.class, })
-@DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", disabledReason = "Disabled on GH Action due to Docker limit")
-public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
- public static final String TOPIC = "test-health";
-
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerHealthCheckIT.class);
-
- private final String from = "kafka:" + TOPIC
- + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
- + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
- + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
-
- @EndpointInject("mock:result")
- private MockEndpoint to;
-
- private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
-
- @BeforeEach
- public void before() {
- Properties props = getDefaultProperties();
- producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
- MockConsumerInterceptor.recordsCaptured.clear();
- }
-
- @AfterEach
- public void after() {
- if (producer != null) {
- producer.close();
- }
- // clean all test topics
- kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
- to.reset();
- }
-
- @Bean("healthRegistry")
- CamelContextConfiguration contextConfiguration() {
- return new CamelContextConfiguration() {
- @Override
- public void beforeApplicationStart(CamelContext context) {
- // install health check manually (yes a bit cumbersome)
- HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
- registry.setCamelContext(context);
- Object hc = registry.resolveById("context");
- registry.register(hc);
- hc = registry.resolveById("routes");
- registry.register(hc);
- hc = registry.resolveById("consumers");
- registry.register(hc);
- context.getCamelContextExtension().addContextPlugin(HealthCheckRegistry.class, registry);
- }
-
- @Override
- public void afterApplicationStart(CamelContext camelContext) {
- // do nothing here
- }
- };
- }
-
- @Order(1)
- @Test
- public void kafkaConsumerHealthCheck() throws InterruptedException {
- // health-check liveness should be UP
- Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
- boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
- Assertions.assertTrue(up, "liveness check");
-
- // health-check readiness should be ready
- Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
- Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
- boolean up2 = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
- Assertions.assertTrue(up2, "readiness check");
- });
-
- String propagatedHeaderKey = "PropagatedCustomHeader";
- byte[] propagatedHeaderValue = "propagated header value".getBytes();
- String skippedHeaderKey = "CamelSkippedHeader";
- to.expectedMessageCount(5);
- to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
- // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
- // exchange because autoCommitEnable=true
- to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, null, null, null, null,
- null);
- to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
-
- for (int k = 0; k < 5; k++) {
- String msg = "message-" + k;
- ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
- data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
- data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
- producer.send(data);
- }
-
- to.assertIsSatisfied(3000);
-
- assertEquals(5, StreamSupport
- .stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(), false).count());
-
- Map<String, Object> headers = to.getExchanges().get(0).getIn().getHeaders();
- assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive skipped header");
- assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive propagated header");
-
- // and shutdown kafka which will make readiness report as DOWN
- service.shutdown();
-
- // health-check liveness should be UP
- res = HealthCheckHelper.invokeLiveness(context);
- up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
- Assertions.assertTrue(up, "liveness check");
- // but health-check readiness should NOT be ready
- Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
- Collection<HealthCheck.Result> res2 = HealthCheckHelper.invoke(context);
- Optional<HealthCheck.Result> down = res2.stream().filter(r -> r.getState().equals(HealthCheck.State.DOWN))
- .findFirst();
- Assertions.assertTrue(down.isPresent());
- String msg = down.get().getMessage().get();
- Assertions.assertTrue(msg.contains("KafkaConsumer is not ready"));
- Map<String, Object> map = down.get().getDetails();
- Assertions.assertEquals(TOPIC, map.get("topic"));
- Assertions.assertEquals("test-health-it", map.get("route.id"));
- });
- }
-
- @Configuration
- public class TestConfiguration {
- @Bean
- public RouteBuilder routeBuilder() {
- return new RouteBuilder() {
- @Override
- public void configure() {
- from(from).process(
- exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
- .routeId("test-health-it").to(to);
- }
- };
- }
-
- @Bean("myHeaderDeserializer")
- public MyKafkaHeaderDeserializer createMyKafkaHeaderDeserializer() {
- return new MyKafkaHeaderDeserializer();
- }
- }
-
- private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
- }
-}