You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2024/03/21 13:34:01 UTC

(camel-spring-boot) 03/04: Remove outdated test. We test in core camel

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git

commit 1ccdc4cd889d77abba227729fe7945d5a6889282
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Thu Mar 21 14:04:31 2024 +0100

    Remove outdated test. We test in core camel
---
 .../integration/KafkaConsumerHealthCheckIT.java    | 209 ---------------------
 1 file changed, 209 deletions(-)

diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
deleted file mode 100644
index 5db919b98a3..00000000000
--- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerHealthCheckIT.java
+++ /dev/null
@@ -1,209 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.integration;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-import java.util.Optional;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.StreamSupport;
-import org.apache.camel.CamelContext;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.KafkaConstants;
-import org.apache.camel.component.kafka.serde.DefaultKafkaHeaderDeserializer;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.health.HealthCheck;
-import org.apache.camel.health.HealthCheckHelper;
-import org.apache.camel.health.HealthCheckRegistry;
-import org.apache.camel.impl.health.DefaultHealthCheckRegistry;
-import org.apache.camel.spring.boot.CamelAutoConfiguration;
-import org.apache.camel.spring.boot.CamelContextConfiguration;
-import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.common.header.internals.RecordHeader;
-import org.awaitility.Awaitility;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.MethodOrderer;
-import org.junit.jupiter.api.Order;
-import org.junit.jupiter.api.Test;
-import org.junit.jupiter.api.TestMethodOrder;
-import org.junit.jupiter.api.condition.DisabledIfSystemProperty;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.annotation.DirtiesContext;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
-@DirtiesContext
-@CamelSpringBootTest
-@SpringBootTest(classes = { CamelAutoConfiguration.class, BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
-        KafkaConsumerHealthCheckIT.class, KafkaConsumerHealthCheckIT.TestConfiguration.class, })
-@DisabledIfSystemProperty(named = "ci.env.name", matches = "github.com", disabledReason = "Disabled on GH Action due to Docker limit")
-public class KafkaConsumerHealthCheckIT extends BaseEmbeddedKafkaTestSupport {
-    public static final String TOPIC = "test-health";
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerHealthCheckIT.class);
-
-    private final String from = "kafka:" + TOPIC
-            + "?groupId=group1&autoOffsetReset=earliest&keyDeserializer=org.apache.kafka.common.serialization.StringDeserializer&"
-            + "valueDeserializer=org.apache.kafka.common.serialization.StringDeserializer"
-            + "&autoCommitIntervalMs=1000&sessionTimeoutMs=30000&autoCommitEnable=true&interceptorClasses=org.apache.camel.component.kafka.integration.MockConsumerInterceptor";
-
-    @EndpointInject("mock:result")
-    private MockEndpoint to;
-
-    private org.apache.kafka.clients.producer.KafkaProducer<String, String> producer;
-
-    @BeforeEach
-    public void before() {
-        Properties props = getDefaultProperties();
-        producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
-        MockConsumerInterceptor.recordsCaptured.clear();
-    }
-
-    @AfterEach
-    public void after() {
-        if (producer != null) {
-            producer.close();
-        }
-        // clean all test topics
-        kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC)).all();
-        to.reset();
-    }
-
-    @Bean("healthRegistry")
-    CamelContextConfiguration contextConfiguration() {
-        return new CamelContextConfiguration() {
-            @Override
-            public void beforeApplicationStart(CamelContext context) {
-                // install health check manually (yes a bit cumbersome)
-                HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
-                registry.setCamelContext(context);
-                Object hc = registry.resolveById("context");
-                registry.register(hc);
-                hc = registry.resolveById("routes");
-                registry.register(hc);
-                hc = registry.resolveById("consumers");
-                registry.register(hc);
-                context.getCamelContextExtension().addContextPlugin(HealthCheckRegistry.class, registry);
-            }
-
-            @Override
-            public void afterApplicationStart(CamelContext camelContext) {
-                // do nothing here
-            }
-        };
-    }
-
-    @Order(1)
-    @Test
-    public void kafkaConsumerHealthCheck() throws InterruptedException {
-        // health-check liveness should be UP
-        Collection<HealthCheck.Result> res = HealthCheckHelper.invokeLiveness(context);
-        boolean up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
-        Assertions.assertTrue(up, "liveness check");
-
-        // health-check readiness should be ready
-        Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
-            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invokeReadiness(context);
-            boolean up2 = res2.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
-            Assertions.assertTrue(up2, "readiness check");
-        });
-
-        String propagatedHeaderKey = "PropagatedCustomHeader";
-        byte[] propagatedHeaderValue = "propagated header value".getBytes();
-        String skippedHeaderKey = "CamelSkippedHeader";
-        to.expectedMessageCount(5);
-        to.expectedBodiesReceivedInAnyOrder("message-0", "message-1", "message-2", "message-3", "message-4");
-        // The LAST_RECORD_BEFORE_COMMIT header should not be configured on any
-        // exchange because autoCommitEnable=true
-        to.expectedHeaderValuesReceivedInAnyOrder(KafkaConstants.LAST_RECORD_BEFORE_COMMIT, null, null, null, null,
-                null);
-        to.expectedHeaderReceived(propagatedHeaderKey, propagatedHeaderValue);
-
-        for (int k = 0; k < 5; k++) {
-            String msg = "message-" + k;
-            ProducerRecord<String, String> data = new ProducerRecord<>(TOPIC, "1", msg);
-            data.headers().add(new RecordHeader("CamelSkippedHeader", "skipped header value".getBytes()));
-            data.headers().add(new RecordHeader(propagatedHeaderKey, propagatedHeaderValue));
-            producer.send(data);
-        }
-
-        to.assertIsSatisfied(3000);
-
-        assertEquals(5, StreamSupport
-                .stream(MockConsumerInterceptor.recordsCaptured.get(0).records(TOPIC).spliterator(), false).count());
-
-        Map<String, Object> headers = to.getExchanges().get(0).getIn().getHeaders();
-        assertFalse(headers.containsKey(skippedHeaderKey), "Should not receive skipped header");
-        assertTrue(headers.containsKey(propagatedHeaderKey), "Should receive propagated header");
-
-        // and shutdown kafka which will make readiness report as DOWN
-        service.shutdown();
-
-        // health-check liveness should be UP
-        res = HealthCheckHelper.invokeLiveness(context);
-        up = res.stream().allMatch(r -> r.getState().equals(HealthCheck.State.UP));
-        Assertions.assertTrue(up, "liveness check");
-        // but health-check readiness should NOT be ready
-        Awaitility.await().atMost(20, TimeUnit.SECONDS).untilAsserted(() -> {
-            Collection<HealthCheck.Result> res2 = HealthCheckHelper.invoke(context);
-            Optional<HealthCheck.Result> down = res2.stream().filter(r -> r.getState().equals(HealthCheck.State.DOWN))
-                    .findFirst();
-            Assertions.assertTrue(down.isPresent());
-            String msg = down.get().getMessage().get();
-            Assertions.assertTrue(msg.contains("KafkaConsumer is not ready"));
-            Map<String, Object> map = down.get().getDetails();
-            Assertions.assertEquals(TOPIC, map.get("topic"));
-            Assertions.assertEquals("test-health-it", map.get("route.id"));
-        });
-    }
-
-    @Configuration
-    public class TestConfiguration {
-        @Bean
-        public RouteBuilder routeBuilder() {
-            return new RouteBuilder() {
-                @Override
-                public void configure() {
-                    from(from).process(
-                            exchange -> LOG.trace("Captured on the processor: {}", exchange.getMessage().getBody()))
-                            .routeId("test-health-it").to(to);
-                }
-            };
-        }
-
-        @Bean("myHeaderDeserializer")
-        public MyKafkaHeaderDeserializer createMyKafkaHeaderDeserializer() {
-            return new MyKafkaHeaderDeserializer();
-        }
-    }
-
-    private static class MyKafkaHeaderDeserializer extends DefaultKafkaHeaderDeserializer {
-    }
-}