You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2022/05/10 09:22:13 UTC
[camel-spring-boot] 01/02: Remove tests in camel-kafka that was out of sync
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel-spring-boot.git
commit 6b315db8bbd33182ea84e87271982c4e517277ad
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Tue May 10 11:21:59 2022 +0200
Remove tests in camel-kafka that was out of sync
---
.../KafkaConsumerWithResumeRouteStrategyIT.java | 250 ---------------------
.../integration/ResumeStrategyConfiguration.java | 35 ---
2 files changed, 285 deletions(-)
diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
deleted file mode 100644
index 05f9d7d72b7..00000000000
--- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/KafkaConsumerWithResumeRouteStrategyIT.java
+++ /dev/null
@@ -1,250 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.integration;
-
-import java.util.Collections;
-import java.util.Properties;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.ThreadLocalRandom;
-import java.util.concurrent.TimeUnit;
-import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
-import org.apache.camel.Offset;
-import org.apache.camel.Resumable;
-import org.apache.camel.Service;
-import org.apache.camel.UpdatableConsumerResumeStrategy;
-import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.kafka.consumer.support.KafkaConsumerResumeStrategy;
-import org.apache.camel.component.kafka.consumer.support.KafkaResumable;
-import org.apache.camel.component.mock.MockEndpoint;
-import org.apache.camel.resume.Resumables;
-import org.apache.camel.spring.boot.CamelAutoConfiguration;
-import org.apache.camel.test.spring.junit5.CamelSpringBootTest;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.springframework.beans.factory.annotation.Autowired;
-import org.springframework.beans.factory.annotation.Qualifier;
-import org.springframework.boot.test.context.SpringBootTest;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.test.annotation.DirtiesContext;
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-
-@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
-@CamelSpringBootTest
-@SpringBootTest(
- classes = {
- CamelAutoConfiguration.class,
- BaseEmbeddedKafkaTestSupport.DefaulKafkaComponent.class,
- KafkaConsumerWithResumeRouteStrategyIT.class,
- KafkaConsumerWithResumeRouteStrategyIT.TestConfiguration.class,
- ResumeStrategyConfiguration.class,
- }
-)
-public class KafkaConsumerWithResumeRouteStrategyIT extends BaseEmbeddedKafkaTestSupport {
- private static final Logger LOG = LoggerFactory.getLogger(KafkaConsumerWithResumeRouteStrategyIT.class);
- private static final String TOPIC = "resumable-route-tp";
- private static final int RANDOM_VALUE = ThreadLocalRandom.current().nextInt(1, 1000);
-
- @EndpointInject("mock:result")
- private MockEndpoint result;
-
- @Autowired
- @Qualifier("resumeStrategy")
- private TestKafkaConsumerResumeStrategy resumeStrategy;
-
- @Autowired
- private CountDownLatch messagesLatch;
- private KafkaProducer<Object, Object> producer;
-
- static class TestKafkaConsumerResumeStrategy
- implements KafkaConsumerResumeStrategy,
- UpdatableConsumerResumeStrategy<String, Integer, Resumable<String, Integer>>, Service {
- private final CountDownLatch messagesLatch;
- private boolean resumeCalled;
- private boolean consumerIsNull = true;
- private boolean startCalled;
- private boolean offsetNull = true;
- private boolean offsetAddressableNull = true;
- private boolean offsetAddressableEmpty = true;
- private boolean offsetValueNull = true;
- private boolean offsetValueEmpty = true;
- private int lastOffset;
-
- public TestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch) {
- this.messagesLatch = messagesLatch;
- }
-
- @Override
- public void setConsumer(Consumer<?, ?> consumer) {
- if (consumer != null) {
- consumerIsNull = false;
- }
- }
-
- @Override
- public void resume(KafkaResumable resumable) {
- resumeCalled = true;
-
- }
-
- @Override
- public void resume() {
- resumeCalled = true;
- }
-
- public boolean isResumeCalled() {
- return resumeCalled;
- }
-
- public boolean isConsumerIsNull() {
- return consumerIsNull;
- }
-
- @Override
- public void start() {
- LOG.warn("Start was called");
- startCalled = true;
- }
-
- @Override
- public void init() {
- LOG.warn("Init was called");
- }
-
- public boolean isStartCalled() {
- return startCalled;
- }
-
- @Override
- public void updateLastOffset(Resumable<String, Integer> offset) {
- try {
- if (offset != null) {
- offsetNull = false;
-
- String addressable = offset.getAddressable();
- if (addressable != null) {
- offsetAddressableNull = false;
- offsetAddressableEmpty = addressable.isEmpty() || addressable.isBlank();
-
- }
-
- Offset<Integer> offsetValue = offset.getLastOffset();
- if (offsetValue != null) {
- offsetValueNull = false;
-
- if (offsetValue.offset() != null) {
- offsetValueEmpty = false;
- lastOffset = offsetValue.offset();
- }
- }
- }
- } finally {
- messagesLatch.countDown();
- }
- }
-
- public boolean isOffsetNull() {
- return offsetNull;
- }
-
- public boolean isOffsetAddressableNull() {
- return offsetAddressableNull;
- }
-
- public boolean isOffsetValueNull() {
- return offsetValueNull;
- }
-
- public boolean isOffsetAddressableEmpty() {
- return offsetAddressableEmpty;
- }
-
- public boolean isOffsetValueEmpty() {
- return offsetValueEmpty;
- }
- }
-
- @BeforeEach
- public void before() {
- Properties props = getDefaultProperties();
-
- for (int i = 0; i < 10; i++) {
- producer = new org.apache.kafka.clients.producer.KafkaProducer<>(props);
- producer.send(new ProducerRecord<>(TOPIC, String.valueOf(i)));
- }
- }
-
- @Test
- // @Timeout(value = 30)
- public void testOffsetIsBeingChecked() throws InterruptedException {
- assertTrue(messagesLatch.await(100, TimeUnit.SECONDS), "The resume was not called");
-
- assertTrue(resumeStrategy.isResumeCalled(),
- "The resume strategy should have been called when the partition was assigned");
- assertFalse(resumeStrategy.isConsumerIsNull(),
- "The consumer passed to the strategy should not be null");
- assertTrue(resumeStrategy.isStartCalled(),
- "The resume strategy should have been started");
- assertFalse(resumeStrategy.isOffsetNull(),
- "The offset should not be null");
- assertFalse(resumeStrategy.isOffsetAddressableNull(),
- "The offset addressable should not be null");
- assertFalse(resumeStrategy.isOffsetAddressableEmpty(),
- "The offset addressable should not be empty");
- assertFalse(resumeStrategy.isOffsetValueNull(),
- "The offset value should not be null");
- assertFalse(resumeStrategy.isOffsetValueEmpty(),
- "The offset value should not be empty");
- assertEquals(RANDOM_VALUE, resumeStrategy.lastOffset, "the offsets don't match");
- }
-
- @AfterEach
- public void after() {
- kafkaAdminClient.deleteTopics(Collections.singletonList(TOPIC));
- }
-
- @Configuration
- public class TestConfiguration {
- @Bean
- public RouteBuilder routeBuilder() {
- return new RouteBuilder() {
- @Override
- public void configure() {
- from("kafka:" + TOPIC + "?groupId=" + TOPIC + "_GROUP&autoCommitIntervalMs=1000"
- + "&autoOffsetReset=earliest&consumersCount=1")
- .routeId("resume-strategy-route")
- .setHeader(Exchange.OFFSET,
- constant(Resumables.of("key", RANDOM_VALUE)))
- .resumable().resumeStrategy("resumeStrategy")
- .to("mock:result");
- }
- };
- }
-
-
- }
-}
diff --git a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java b/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
deleted file mode 100644
index e9ea18c776e..00000000000
--- a/components-starter/camel-kafka-starter/src/test/java/org/apache/camel/component/kafka/integration/ResumeStrategyConfiguration.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.kafka.integration;
-
-import java.util.concurrent.CountDownLatch;
-import org.springframework.context.annotation.Bean;
-import org.springframework.context.annotation.Configuration;
-
-@Configuration
-public class ResumeStrategyConfiguration {
-
- @Bean("resumeStrategy")
- public KafkaConsumerWithResumeRouteStrategyIT.TestKafkaConsumerResumeStrategy createTestKafkaConsumerResumeStrategy(CountDownLatch messagesLatch){
- return new KafkaConsumerWithResumeRouteStrategyIT.TestKafkaConsumerResumeStrategy(messagesLatch);
- }
-
- @Bean
- public CountDownLatch createCountDownLatch(){
- return new CountDownLatch(1);
- }
-}