You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/07/31 10:46:45 UTC
[camel] branch main updated: camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish. (#10919)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 872617423c8 camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish. (#10919)
872617423c8 is described below
commit 872617423c853dd7ab0d2f853e33b4c9abbadceb
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Mon Jul 31 12:46:38 2023 +0200
camel-aws2-kinesis - Consumer already have health-check OOTB. Make KinesisConnection generic and use by both consumer and producer. Polish. (#10919)
---
.../component/aws2/kinesis/Kinesis2Component.java | 18 +++++++
.../component/aws2/kinesis/Kinesis2Consumer.java | 38 +++++++------
.../component/aws2/kinesis/Kinesis2Endpoint.java | 19 ++-----
.../component/aws2/kinesis/Kinesis2Producer.java | 20 ++++++-
.../kinesis/{consumer => }/KinesisConnection.java | 42 +++++++--------
.../aws2/kinesis/consumer/KinesisHealthCheck.java | 63 ----------------------
.../Kinesis2ConsumerHealthCheckProfileCredsIT.java | 2 +-
.../KinesisConsumerClosedShardWithFailTest.java | 25 ++-------
.../KinesisConsumerClosedShardWithSilentTest.java | 24 ++-------
.../kinesis/integration/KinesisConsumerIT.java | 2 -
.../integration/KinesisConsumerResumeIT.java | 2 -
.../kinesis/integration/KinesisProducerIT.java | 3 --
12 files changed, 97 insertions(+), 161 deletions(-)
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
index bfe2c1ab8ab..abd7a245c4c 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Component.java
@@ -23,6 +23,7 @@ import org.apache.camel.Endpoint;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.annotations.Component;
import org.apache.camel.support.HealthCheckComponent;
+import org.apache.camel.util.IOHelper;
@Component("aws2-kinesis")
public class Kinesis2Component extends HealthCheckComponent {
@@ -30,6 +31,8 @@ public class Kinesis2Component extends HealthCheckComponent {
@Metadata
private Kinesis2Configuration configuration = new Kinesis2Configuration();
+ private KinesisConnection connection = new KinesisConnection();
+
public Kinesis2Component() {
this(null);
}
@@ -65,4 +68,19 @@ public class Kinesis2Component extends HealthCheckComponent {
public void setConfiguration(Kinesis2Configuration configuration) {
this.configuration = configuration;
}
+
+ @Override
+ protected void doStart() throws Exception {
+ // use pre-configured client (if any)
+ connection.setKinesisClient(configuration.getAmazonKinesisClient());
+ }
+
+ @Override
+ protected void doStop() throws Exception {
+ IOHelper.close(connection);
+ }
+
+ public KinesisConnection getConnection() {
+ return connection;
+ }
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 70481d598dc..f70551aa53f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -25,7 +25,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
@@ -47,6 +46,8 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements ResumeAware<ResumeStrategy> {
private static final Logger LOG = LoggerFactory.getLogger(Kinesis2Consumer.class);
+
+ private KinesisConnection connection;
private boolean isShardClosed;
private ResumeStrategy resumeStrategy;
@@ -55,21 +56,27 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
super(endpoint, processor);
}
+ public KinesisConnection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(KinesisConnection connection) {
+ this.connection = connection;
+ }
+
@Override
protected int poll() throws Exception {
-
var processedExchangeCount = new AtomicInteger(0);
- var kinesisConnection = KinesisConnection.getInstance();
if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
var request = DescribeStreamRequest
.builder()
.streamName(getEndpoint().getConfiguration().getStreamName())
.build();
- DescribeStreamResponse response = null;
+ DescribeStreamResponse response;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
- response = kinesisConnection
+ response = connection
.getAsyncClient(getEndpoint())
.describeStream(request)
.get();
@@ -77,7 +84,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
throw new RuntimeException(e);
}
} else {
- response = kinesisConnection
+ response = connection
.getClient(getEndpoint())
.describeStream(request);
}
@@ -94,13 +101,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
.findFirst()
.orElseThrow(() -> new IllegalStateException("The shard can't be found"));
- fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
+ fetchAndPrepareRecordsForCamel(shard, connection, processedExchangeCount);
} else {
- getShardList(kinesisConnection)
+ getShardList(connection)
.parallelStream()
.forEach(shard -> {
- fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
+ fetchAndPrepareRecordsForCamel(shard, connection, processedExchangeCount);
});
}
@@ -111,7 +118,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
final Shard shard,
final KinesisConnection kinesisConnection,
AtomicInteger processedExchangeCount) {
- String shardIterator = null;
+ String shardIterator;
try {
shardIterator = getShardIterator(shard, kinesisConnection);
} catch (ExecutionException | InterruptedException e) {
@@ -131,7 +138,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
.getMaxResultsPerRequest())
.build();
- GetRecordsResponse result = null;
+ GetRecordsResponse result;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
result = kinesisConnection
@@ -218,7 +225,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
resume(request);
- GetShardIteratorResponse result = null;
+ GetShardIteratorResponse result;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
result = kinesisConnection
@@ -295,6 +302,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
protected void doStart() throws Exception {
super.doStart();
+ ObjectHelper.notNull(connection, "connection", this);
+
if (resumeStrategy != null) {
resumeStrategy.loadCache();
}
@@ -305,13 +314,12 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
}
private List<Shard> getShardList(final KinesisConnection kinesisConnection) {
-
var request = ListShardsRequest
.builder()
.streamName(getEndpoint().getConfiguration().getStreamName())
.build();
- List<Shard> shardList = null;
+ List<Shard> shardList;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
shardList = kinesisConnection
@@ -330,6 +338,6 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
}
return shardList;
-
}
+
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index b97222fadd7..6ab8d5f7cc4 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -17,14 +17,11 @@
package org.apache.camel.component.aws2.kinesis;
import java.util.Objects;
-import java.util.concurrent.TimeUnit;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisHealthCheck;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.support.ScheduledPollEndpoint;
@@ -58,7 +55,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
protected void doStart() throws Exception {
super.doStart();
- var kinesisConnection = KinesisConnection.getInstance();
+ var kinesisConnection = getComponent().getConnection();
if (!configuration.isCborEnabled()) {
System.setProperty(CBOR_ENABLED.property(), "false");
@@ -96,26 +93,20 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
@Override
public Producer createProducer() throws Exception {
- return new Kinesis2Producer(this);
+ Kinesis2Producer producer = new Kinesis2Producer(this);
+ producer.setConnection(getComponent().getConnection());
+ return producer;
}
@Override
public Consumer createConsumer(Processor processor) throws Exception {
final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor);
+ consumer.setConnection(getComponent().getConnection());
consumer.setSchedulerProperties(getSchedulerProperties());
- startHealthChecks();
configureConsumer(consumer);
return consumer;
}
- private void startHealthChecks() {
- var timeoutCheckerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
- CONNECTION_CHECKER_EXECUTOR_NAME);
- timeoutCheckerExecutorService.scheduleAtFixedRate(new KinesisHealthCheck(this),
- 0, 5 * 1000,
- TimeUnit.MILLISECONDS);
- }
-
@Override
public Kinesis2Component getComponent() {
return (Kinesis2Component) super.getComponent();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
index bbe4dcd9ee8..1763dd3fe83 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Producer.java
@@ -21,16 +21,27 @@ import java.nio.ByteBuffer;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
import org.apache.camel.support.DefaultProducer;
+import org.apache.camel.util.ObjectHelper;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kinesis.model.PutRecordRequest;
import software.amazon.awssdk.services.kinesis.model.PutRecordResponse;
public class Kinesis2Producer extends DefaultProducer {
+ private KinesisConnection connection;
+
public Kinesis2Producer(Kinesis2Endpoint endpoint) {
super(endpoint);
}
+ public KinesisConnection getConnection() {
+ return connection;
+ }
+
+ public void setConnection(KinesisConnection connection) {
+ this.connection = connection;
+ }
+
@Override
public Kinesis2Endpoint getEndpoint() {
return (Kinesis2Endpoint) super.getEndpoint();
@@ -39,7 +50,7 @@ public class Kinesis2Producer extends DefaultProducer {
@Override
public void process(Exchange exchange) throws Exception {
PutRecordRequest request = createRequest(exchange);
- PutRecordResponse putRecordResult = getEndpoint().getClient().putRecord(request);
+ PutRecordResponse putRecordResult = connection.getClient(getEndpoint()).putRecord(request);
Message message = getMessageForResponse(exchange);
message.setHeader(Kinesis2Constants.SEQUENCE_NUMBER, putRecordResult.sequenceNumber());
message.setHeader(Kinesis2Constants.SHARD_ID, putRecordResult.shardId());
@@ -63,4 +74,11 @@ public class Kinesis2Producer extends DefaultProducer {
public static Message getMessageForResponse(final Exchange exchange) {
return exchange.getMessage();
}
+
+ @Override
+ protected void doStart() throws Exception {
+ super.doStart();
+
+ ObjectHelper.notNull(connection, "connection", this);
+ }
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
similarity index 70%
rename from components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
rename to components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
index 4ac3c3046b5..c5c23f1270f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/KinesisConnection.java
@@ -14,35 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.camel.component.aws2.kinesis.consumer;
+package org.apache.camel.component.aws2.kinesis;
+import java.io.Closeable;
+import java.io.IOException;
import java.util.Objects;
-import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.awssdk.services.kinesis.KinesisClient;
-public class KinesisConnection {
- private static volatile KinesisConnection instance;
- private KinesisClient kinesisClient = null;
- private KinesisAsyncClient kinesisAsyncClient = null;
-
- private KinesisConnection() {
- }
+/**
+ * Holds connections to AWS from {@link KinesisClient} and {@link KinesisAsyncClient}.
+ */
+public class KinesisConnection implements Closeable {
- public static synchronized KinesisConnection getInstance() {
- if (instance == null) {
- synchronized (KinesisConnection.class) {
- if (instance == null) {
- instance = new KinesisConnection();
- }
- }
- }
- return instance;
- }
+ private KinesisClient kinesisClient;
+ private KinesisAsyncClient kinesisAsyncClient;
- public KinesisClient getClient(final Kinesis2Endpoint endpoint) {
+ public synchronized KinesisClient getClient(final Kinesis2Endpoint endpoint) {
if (Objects.isNull(kinesisClient)) {
kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null
? endpoint.getConfiguration().getAmazonKinesisClient()
@@ -51,7 +41,7 @@ public class KinesisConnection {
return kinesisClient;
}
- public KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
+ public synchronized KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
if (Objects.isNull(kinesisAsyncClient)) {
kinesisAsyncClient = KinesisClientFactory
.getKinesisAsyncClient(endpoint.getConfiguration())
@@ -67,4 +57,14 @@ public class KinesisConnection {
public void setKinesisAsyncClient(final KinesisAsyncClient kinesisAsyncClient) {
this.kinesisAsyncClient = kinesisAsyncClient;
}
+
+ @Override
+ public void close() throws IOException {
+ if (kinesisClient != null) {
+ kinesisClient.close();
+ }
+ if (kinesisAsyncClient != null) {
+ kinesisAsyncClient.close();
+ }
+ }
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
deleted file mode 100644
index 5bf59880dc2..00000000000
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws2.kinesis.consumer;
-
-import java.util.Objects;
-import java.util.concurrent.ExecutionException;
-
-import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
-import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
-
-public class KinesisHealthCheck implements Runnable {
- private Kinesis2Endpoint endpoint;
-
- public KinesisHealthCheck(Kinesis2Endpoint endpoint) {
- this.endpoint = endpoint;
- }
-
- @Override
- public void run() {
- var kinesisConnection = KinesisConnection.getInstance();
- if (this.endpoint.getConfiguration().isAsyncClient()) {
- try {
- if (Objects.isNull(kinesisConnection.getAsyncClient(this.endpoint)) ||
- kinesisConnection.getAsyncClient(this.endpoint)
- .listStreams(ListStreamsRequest
- .builder()
- .build())
- .get()
- .streamNames()
- .isEmpty()) {
- kinesisConnection.setKinesisAsyncClient(endpoint.getAsyncClient());
- }
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- } else {
- if (Objects.isNull(kinesisConnection.getClient(this.endpoint)) ||
- kinesisConnection.getClient(this.endpoint)
- .listStreams(ListStreamsRequest
- .builder()
- .build())
- .streamNames()
- .isEmpty()) {
- kinesisConnection.setKinesisClient(endpoint.getClient());
- }
- }
- }
-
-}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
index 32795c37d8b..eed9ce5d389 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/Kinesis2ConsumerHealthCheckProfileCredsIT.java
@@ -47,9 +47,9 @@ public class Kinesis2ConsumerHealthCheckProfileCredsIT extends CamelTestSupport
protected CamelContext createCamelContext() throws Exception {
context = super.createCamelContext();
context.getPropertiesComponent().setLocation("ref:prop");
+
Kinesis2Component component = new Kinesis2Component(context);
component.getConfiguration().setAmazonKinesisClient(AWSSDKClientUtils.newKinesisClient());
- component.init();
context.addComponent("aws2-kinesis", component);
HealthCheckRegistry registry = new DefaultHealthCheckRegistry();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 2d7690a78fd..495942d298e 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -16,12 +16,10 @@
*/
package org.apache.camel.component.aws2.kinesis;
-import java.lang.reflect.Field;
import java.util.ArrayList;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -54,8 +52,6 @@ public class KinesisConsumerClosedShardWithFailTest {
private KinesisClient kinesisClient;
@Mock
private AsyncProcessor processor;
- @Mock
- private KinesisConnection kinesisConnection;
private final CamelContext context = new DefaultCamelContext();
private final Kinesis2Component component = new Kinesis2Component(context);
@@ -64,26 +60,25 @@ public class KinesisConsumerClosedShardWithFailTest {
@BeforeEach
public void setup() {
+ component.start();
+
Kinesis2Configuration configuration = new Kinesis2Configuration();
configuration.setAmazonKinesisClient(kinesisClient);
configuration.setIteratorType(ShardIteratorType.LATEST);
configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.fail);
configuration.setStreamName("streamName");
- setMock(kinesisConnection);
-
- Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
+ Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
endpoint.start();
underTest = new Kinesis2Consumer(endpoint, processor);
+ underTest.setConnection(component.getConnection());
+ underTest.start();
SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
ArrayList<Shard> shardList = new ArrayList<>();
shardList.add(shard);
- when(kinesisConnection
- .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
-
when(kinesisClient
.getRecords(any(GetRecordsRequest.class)))
.thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
@@ -95,16 +90,6 @@ public class KinesisConsumerClosedShardWithFailTest {
.thenReturn(ListShardsResponse.builder().shards(shardList).build());
}
- private void setMock(KinesisConnection mock) {
- try {
- Field instance = KinesisConnection.class.getDeclaredField("instance");
- instance.setAccessible(true);
- instance.set(instance, mock);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
@Test
public void itObtainsAShardIteratorOnFirstPoll() {
assertThrows(IllegalStateException.class, () -> {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 4605cd5b60f..d14e5ebb332 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -16,13 +16,11 @@
*/
package org.apache.camel.component.aws2.kinesis;
-import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -62,8 +60,7 @@ public class KinesisConsumerClosedShardWithSilentTest {
private KinesisClient kinesisClient;
@Mock
private AsyncProcessor processor;
- @Mock
- private KinesisConnection kinesisConnection;
+
private final CamelContext context = new DefaultCamelContext();
private final Kinesis2Component component = new Kinesis2Component(context);
@@ -71,26 +68,25 @@ public class KinesisConsumerClosedShardWithSilentTest {
@BeforeEach
public void setup() {
+ component.start();
+
Kinesis2Configuration configuration = new Kinesis2Configuration();
configuration.setAmazonKinesisClient(kinesisClient);
configuration.setIteratorType(ShardIteratorType.LATEST);
configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent);
configuration.setStreamName("streamName");
- setMock(kinesisConnection);
-
Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
endpoint.start();
underTest = new Kinesis2Consumer(endpoint, processor);
+ underTest.setConnection(component.getConnection());
+ underTest.start();
SequenceNumberRange range = SequenceNumberRange.builder().endingSequenceNumber("20").build();
Shard shard = Shard.builder().shardId("shardId").sequenceNumberRange(range).build();
ArrayList<Shard> shardList = new ArrayList<>();
shardList.add(shard);
- when(kinesisConnection
- .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
-
when(kinesisClient
.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
.nextShardIterator("shardIterator")
@@ -115,16 +111,6 @@ public class KinesisConsumerClosedShardWithSilentTest {
}
- private void setMock(KinesisConnection mock) {
- try {
- Field instance = KinesisConnection.class.getDeclaredField("instance");
- instance.setAccessible(true);
- instance.set(instance, mock);
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
-
@Test
public void itObtainsAShardIteratorOnFirstPoll() throws Exception {
underTest.poll();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index 7cc59431eb8..754862b44c1 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -80,7 +79,6 @@ public class KinesisConsumerIT extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
client = AWSSDKClientUtils.newKinesisClient();
- KinesisConnection.getInstance().setKinesisClient(client);
context.getRegistry().bind("amazonKinesisClient", client);
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
index 20f508e13bd..26686b3cb33 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
@@ -133,7 +132,6 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
client = AWSSDKClientUtils.newKinesisClient();
- KinesisConnection.getInstance().setKinesisClient(client);
context.getRegistry().bind("amazonKinesisClient", client);
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
index 7c5207ffd70..036f54f90e9 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
@@ -25,7 +25,6 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
-import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -120,8 +119,6 @@ public class KinesisProducerIT extends CamelTestSupport {
protected RouteBuilder createRouteBuilder() {
client = AWSSDKClientUtils.newKinesisClient();
- KinesisConnection.getInstance().setKinesisClient(client);
-
context.getRegistry().bind("amazonKinesisClient", client);
return new RouteBuilder() {