You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ac...@apache.org on 2023/07/28 13:55:06 UTC
[camel] branch main updated: kinesis connection retry mechanism added into the current consumer (#10870)
This is an automated email from the ASF dual-hosted git repository.
acosentino pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new c08104ee967 kinesis connection retry mechanism added into the current consumer (#10870)
c08104ee967 is described below
commit c08104ee967dbe112af83b7813084a61d0bbd89f
Author: Hamed Hatami <ja...@gmail.com>
AuthorDate: Fri Jul 28 15:55:00 2023 +0200
kinesis connection retry mechanism added into the current consumer (#10870)
* kinesis connection retry mechanism added into the current consumer
* kinesis connection retry mechanism added into the current consumer
* kinesis connection retry mechanism added into the current consumer
* kinesis connection retry mechanism added into the current consumer
* kinesis connection retry mechanism added into the current consumer
* kinesis connection retry mechanism added into the current consumer
* kinesis connection retry mechanism added into the current consumer
---------
Co-authored-by: Hamed Hatami <ha...@postnord.com>
---
.../component/aws2/kinesis/Kinesis2Consumer.java | 60 +++++++++++--------
.../component/aws2/kinesis/Kinesis2Endpoint.java | 25 +++++---
.../aws2/kinesis/consumer/KinesisConnection.java | 70 ++++++++++++++++++++++
.../aws2/kinesis/consumer/KinesisHealthCheck.java | 63 +++++++++++++++++++
.../KinesisConsumerClosedShardWithFailTest.java | 29 ++++++++-
.../KinesisConsumerClosedShardWithSilentTest.java | 48 +++++++++++----
.../kinesis/integration/KinesisConsumerIT.java | 23 ++++---
.../integration/KinesisConsumerResumeIT.java | 7 ++-
.../kinesis/integration/KinesisProducerIT.java | 3 +
9 files changed, 266 insertions(+), 62 deletions(-)
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 6aef226e5ed..70481d598dc 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
import org.apache.camel.resume.ResumeAware;
import org.apache.camel.resume.ResumeStrategy;
@@ -33,8 +34,6 @@ import org.apache.camel.util.CastUtils;
import org.apache.camel.util.ObjectHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
-import software.amazon.awssdk.services.kinesis.KinesisClient;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest;
import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse;
import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
@@ -51,7 +50,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
private boolean isShardClosed;
private ResumeStrategy resumeStrategy;
- public Kinesis2Consumer(Kinesis2Endpoint endpoint, Processor processor) {
+ public Kinesis2Consumer(Kinesis2Endpoint endpoint,
+ Processor processor) {
super(endpoint, processor);
}
@@ -59,6 +59,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
protected int poll() throws Exception {
var processedExchangeCount = new AtomicInteger(0);
+ var kinesisConnection = KinesisConnection.getInstance();
if (!getEndpoint().getConfiguration().getShardId().isEmpty()) {
var request = DescribeStreamRequest
@@ -68,14 +69,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
DescribeStreamResponse response = null;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
- response = getAsyncClient()
+ response = kinesisConnection
+ .getAsyncClient(getEndpoint())
.describeStream(request)
.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
} else {
- response = getClient().describeStream(request);
+ response = kinesisConnection
+ .getClient(getEndpoint())
+ .describeStream(request);
}
var shard = response
@@ -90,13 +94,13 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
.findFirst()
.orElseThrow(() -> new IllegalStateException("The shard can't be found"));
- fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+ fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
} else {
- getShardList()
+ getShardList(kinesisConnection)
.parallelStream()
.forEach(shard -> {
- fetchAndPrepareRecordsForCamel(shard, processedExchangeCount);
+ fetchAndPrepareRecordsForCamel(shard, kinesisConnection, processedExchangeCount);
});
}
@@ -105,10 +109,11 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
private void fetchAndPrepareRecordsForCamel(
final Shard shard,
+ final KinesisConnection kinesisConnection,
AtomicInteger processedExchangeCount) {
String shardIterator = null;
try {
- shardIterator = getShardIterator(shard);
+ shardIterator = getShardIterator(shard, kinesisConnection);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
@@ -129,14 +134,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
GetRecordsResponse result = null;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
- result = getAsyncClient()
+ result = kinesisConnection
+ .getAsyncClient(getEndpoint())
.getRecords(req)
.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
} else {
- result = getClient().getRecords(req);
+ result = kinesisConnection
+ .getClient(getEndpoint())
+ .getRecords(req);
}
try {
@@ -186,20 +194,15 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
return processedExchanges;
}
- private KinesisClient getClient() {
- return getEndpoint().getClient();
- }
-
- private KinesisAsyncClient getAsyncClient() {
- return getEndpoint().getAsyncClient();
- }
-
@Override
public Kinesis2Endpoint getEndpoint() {
return (Kinesis2Endpoint) super.getEndpoint();
}
- private String getShardIterator(final Shard shard) throws ExecutionException, InterruptedException {
+ private String getShardIterator(
+ final Shard shard,
+ final KinesisConnection kinesisConnection)
+ throws ExecutionException, InterruptedException {
var shardId = shard.shardId();
isShardClosed = shard.sequenceNumberRange().endingSequenceNumber() != null;
@@ -218,14 +221,17 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
GetShardIteratorResponse result = null;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
- result = getAsyncClient()
+ result = kinesisConnection
+ .getAsyncClient(getEndpoint())
.getShardIterator(request.build())
.get();
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException(e);
}
} else {
- result = getClient().getShardIterator(request.build());
+ result = kinesisConnection
+ .getClient(getEndpoint())
+ .getShardIterator(request.build());
}
return result.shardIterator();
@@ -298,7 +304,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
return getEndpoint().getConfiguration();
}
- private List<Shard> getShardList() {
+ private List<Shard> getShardList(final KinesisConnection kinesisConnection) {
var request = ListShardsRequest
.builder()
@@ -308,7 +314,8 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
List<Shard> shardList = null;
if (getEndpoint().getConfiguration().isAsyncClient()) {
try {
- shardList = getAsyncClient()
+ shardList = kinesisConnection
+ .getAsyncClient(getEndpoint())
.listShards(request)
.get()
.shards();
@@ -316,7 +323,10 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
throw new RuntimeException(e);
}
} else {
- shardList = getClient().listShards(request).shards();
+ shardList = kinesisConnection
+ .getClient(getEndpoint())
+ .listShards(request)
+ .shards();
}
return shardList;
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
index e4d4d122c96..b97222fadd7 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Endpoint.java
@@ -17,12 +17,14 @@
package org.apache.camel.component.aws2.kinesis;
import java.util.Objects;
+import java.util.concurrent.TimeUnit;
import org.apache.camel.Category;
import org.apache.camel.Consumer;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
-import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisHealthCheck;
import org.apache.camel.spi.UriEndpoint;
import org.apache.camel.spi.UriParam;
import org.apache.camel.support.ScheduledPollEndpoint;
@@ -45,6 +47,7 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
private KinesisClient kinesisClient;
private KinesisAsyncClient kinesisAsyncClient;
+ private static final String CONNECTION_CHECKER_EXECUTOR_NAME = "Kinesis_Streaming_Connection_Checker";
public Kinesis2Endpoint(String uri, Kinesis2Configuration configuration, Kinesis2Component component) {
super(uri, component);
@@ -54,19 +57,18 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
@Override
protected void doStart() throws Exception {
super.doStart();
+
+ var kinesisConnection = KinesisConnection.getInstance();
+
if (!configuration.isCborEnabled()) {
System.setProperty(CBOR_ENABLED.property(), "false");
}
if (configuration.isAsyncClient() &&
Objects.isNull(configuration.getAmazonKinesisClient())) {
- kinesisAsyncClient = KinesisClientFactory
- .getKinesisAsyncClient(configuration)
- .getKinesisAsyncClient();
+ kinesisAsyncClient = kinesisConnection.getAsyncClient(this);
} else {
- kinesisClient = configuration.getAmazonKinesisClient() != null
- ? configuration.getAmazonKinesisClient()
- : KinesisClientFactory.getKinesisClient(configuration).getKinesisClient();
+ kinesisClient = kinesisConnection.getClient(this);
}
if ((configuration.getIteratorType().equals(ShardIteratorType.AFTER_SEQUENCE_NUMBER)
@@ -101,10 +103,19 @@ public class Kinesis2Endpoint extends ScheduledPollEndpoint {
public Consumer createConsumer(Processor processor) throws Exception {
final Kinesis2Consumer consumer = new Kinesis2Consumer(this, processor);
consumer.setSchedulerProperties(getSchedulerProperties());
+ startHealthChecks();
configureConsumer(consumer);
return consumer;
}
+ private void startHealthChecks() {
+ var timeoutCheckerExecutorService = getCamelContext().getExecutorServiceManager().newSingleThreadScheduledExecutor(this,
+ CONNECTION_CHECKER_EXECUTOR_NAME);
+ timeoutCheckerExecutorService.scheduleAtFixedRate(new KinesisHealthCheck(this),
+ 0, 5 * 1000,
+ TimeUnit.MILLISECONDS);
+ }
+
@Override
public Kinesis2Component getComponent() {
return (Kinesis2Component) super.getComponent();
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
new file mode 100644
index 00000000000..4ac3c3046b5
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisConnection.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import java.util.Objects;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
+import org.apache.camel.component.aws2.kinesis.client.KinesisClientFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+
+public class KinesisConnection {
+ private static volatile KinesisConnection instance;
+ private KinesisClient kinesisClient = null;
+ private KinesisAsyncClient kinesisAsyncClient = null;
+
+ private KinesisConnection() {
+ }
+
+ public static synchronized KinesisConnection getInstance() {
+ if (instance == null) {
+ synchronized (KinesisConnection.class) {
+ if (instance == null) {
+ instance = new KinesisConnection();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public KinesisClient getClient(final Kinesis2Endpoint endpoint) {
+ if (Objects.isNull(kinesisClient)) {
+ kinesisClient = endpoint.getConfiguration().getAmazonKinesisClient() != null
+ ? endpoint.getConfiguration().getAmazonKinesisClient()
+ : KinesisClientFactory.getKinesisClient(endpoint.getConfiguration()).getKinesisClient();
+ }
+ return kinesisClient;
+ }
+
+ public KinesisAsyncClient getAsyncClient(final Kinesis2Endpoint endpoint) {
+ if (Objects.isNull(kinesisAsyncClient)) {
+ kinesisAsyncClient = KinesisClientFactory
+ .getKinesisAsyncClient(endpoint.getConfiguration())
+ .getKinesisAsyncClient();
+ }
+ return kinesisAsyncClient;
+ }
+
+ public void setKinesisClient(final KinesisClient kinesisClient) {
+ this.kinesisClient = kinesisClient;
+ }
+
+ public void setKinesisAsyncClient(final KinesisAsyncClient kinesisAsyncClient) {
+ this.kinesisAsyncClient = kinesisAsyncClient;
+ }
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
new file mode 100644
index 00000000000..5bf59880dc2
--- /dev/null
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/consumer/KinesisHealthCheck.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws2.kinesis.consumer;
+
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+
+import org.apache.camel.component.aws2.kinesis.Kinesis2Endpoint;
+import software.amazon.awssdk.services.kinesis.model.ListStreamsRequest;
+
+public class KinesisHealthCheck implements Runnable {
+ private Kinesis2Endpoint endpoint;
+
+ public KinesisHealthCheck(Kinesis2Endpoint endpoint) {
+ this.endpoint = endpoint;
+ }
+
+ @Override
+ public void run() {
+ var kinesisConnection = KinesisConnection.getInstance();
+ if (this.endpoint.getConfiguration().isAsyncClient()) {
+ try {
+ if (Objects.isNull(kinesisConnection.getAsyncClient(this.endpoint)) ||
+ kinesisConnection.getAsyncClient(this.endpoint)
+ .listStreams(ListStreamsRequest
+ .builder()
+ .build())
+ .get()
+ .streamNames()
+ .isEmpty()) {
+ kinesisConnection.setKinesisAsyncClient(endpoint.getAsyncClient());
+ }
+ } catch (ExecutionException | InterruptedException e) {
+ throw new RuntimeException(e);
+ }
+ } else {
+ if (Objects.isNull(kinesisConnection.getClient(this.endpoint)) ||
+ kinesisConnection.getClient(this.endpoint)
+ .listStreams(ListStreamsRequest
+ .builder()
+ .build())
+ .streamNames()
+ .isEmpty()) {
+ kinesisConnection.setKinesisClient(endpoint.getClient());
+ }
+ }
+ }
+
+}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index 48e739e423f..2d7690a78fd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -16,10 +16,12 @@
*/
package org.apache.camel.component.aws2.kinesis;
+import java.lang.reflect.Field;
import java.util.ArrayList;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -52,6 +54,8 @@ public class KinesisConsumerClosedShardWithFailTest {
private KinesisClient kinesisClient;
@Mock
private AsyncProcessor processor;
+ @Mock
+ private KinesisConnection kinesisConnection;
private final CamelContext context = new DefaultCamelContext();
private final Kinesis2Component component = new Kinesis2Component(context);
@@ -65,6 +69,9 @@ public class KinesisConsumerClosedShardWithFailTest {
configuration.setIteratorType(ShardIteratorType.LATEST);
configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.fail);
configuration.setStreamName("streamName");
+
+ setMock(kinesisConnection);
+
Kinesis2Endpoint endpoint = new Kinesis2Endpoint(null, configuration, component);
endpoint.start();
underTest = new Kinesis2Consumer(endpoint, processor);
@@ -74,14 +81,30 @@ public class KinesisConsumerClosedShardWithFailTest {
ArrayList<Shard> shardList = new ArrayList<>();
shardList.add(shard);
- when(kinesisClient.getRecords(any(GetRecordsRequest.class)))
+ when(kinesisConnection
+ .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
+
+ when(kinesisClient
+ .getRecords(any(GetRecordsRequest.class)))
.thenReturn(GetRecordsResponse.builder().nextShardIterator("nextShardIterator").build());
- when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
+ when(kinesisClient
+ .getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
- when(kinesisClient.listShards(any(ListShardsRequest.class)))
+ when(kinesisClient
+ .listShards(any(ListShardsRequest.class)))
.thenReturn(ListShardsResponse.builder().shards(shardList).build());
}
+ private void setMock(KinesisConnection mock) {
+ try {
+ Field instance = KinesisConnection.class.getDeclaredField("instance");
+ instance.setAccessible(true);
+ instance.set(instance, mock);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
@Test
public void itObtainsAShardIteratorOnFirstPoll() {
assertThrows(IllegalStateException.class, () -> {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
index 2b34ca315ed..4605cd5b60f 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithSilentTest.java
@@ -16,11 +16,13 @@
*/
package org.apache.camel.component.aws2.kinesis;
+import java.lang.reflect.Field;
import java.nio.charset.Charset;
import java.util.ArrayList;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.CamelContext;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -60,7 +62,8 @@ public class KinesisConsumerClosedShardWithSilentTest {
private KinesisClient kinesisClient;
@Mock
private AsyncProcessor processor;
-
+ @Mock
+ private KinesisConnection kinesisConnection;
private final CamelContext context = new DefaultCamelContext();
private final Kinesis2Component component = new Kinesis2Component(context);
@@ -73,6 +76,9 @@ public class KinesisConsumerClosedShardWithSilentTest {
configuration.setIteratorType(ShardIteratorType.LATEST);
configuration.setShardClosed(Kinesis2ShardClosedStrategyEnum.silent);
configuration.setStreamName("streamName");
+
+ setMock(kinesisConnection);
+
Kinesis2Endpoint endpoint = new Kinesis2Endpoint("aws2-kinesis:foo", configuration, component);
endpoint.start();
underTest = new Kinesis2Consumer(endpoint, processor);
@@ -82,21 +88,41 @@ public class KinesisConsumerClosedShardWithSilentTest {
ArrayList<Shard> shardList = new ArrayList<>();
shardList.add(shard);
- when(kinesisClient.getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
- .nextShardIterator("shardIterator")
- .records(
- Record.builder().sequenceNumber("1").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
- .build(),
- Record.builder().sequenceNumber("2").data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
- .build())
- .build());
- when(kinesisClient.getShardIterator(any(GetShardIteratorRequest.class)))
+ when(kinesisConnection
+ .getClient(any(Kinesis2Endpoint.class))).thenReturn(kinesisClient);
+
+ when(kinesisClient
+ .getRecords(any(GetRecordsRequest.class))).thenReturn(GetRecordsResponse.builder()
+ .nextShardIterator("shardIterator")
+ .records(
+ Record.builder().sequenceNumber("1")
+ .data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
+ .build(),
+ Record.builder().sequenceNumber("2")
+ .data(SdkBytes.fromString("Hello", Charset.defaultCharset()))
+ .build())
+ .build());
+
+ when(kinesisClient
+ .getShardIterator(any(GetShardIteratorRequest.class)))
.thenReturn(GetShardIteratorResponse.builder().shardIterator("shardIterator").build());
- when(kinesisClient.listShards(any(ListShardsRequest.class)))
+ when(kinesisClient
+ .listShards(any(ListShardsRequest.class)))
.thenReturn(ListShardsResponse.builder().shards(shardList).build());
context.start();
underTest.start();
+
+ }
+
+ private void setMock(KinesisConnection mock) {
+ try {
+ Field instance = KinesisConnection.class.getDeclaredField("instance");
+ instance.setAccessible(true);
+ instance.set(instance, mock);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
@Test
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index beebc63905e..7cc59431eb8 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -22,11 +22,10 @@ import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.camel.EndpointInject;
-import org.apache.camel.Exchange;
import org.apache.camel.Message;
-import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -81,6 +80,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
client = AWSSDKClientUtils.newKinesisClient();
+ KinesisConnection.getInstance().setKinesisClient(client);
context.getRegistry().bind("amazonKinesisClient", client);
@@ -90,20 +90,17 @@ public class KinesisConsumerIT extends CamelTestSupport {
String kinesisEndpointUri = "aws2-kinesis://%s?amazonKinesisClient=#amazonKinesisClient";
fromF(kinesisEndpointUri, streamName)
- .process(new Processor() {
- @Override
- public void process(Exchange exchange) {
- KinesisData data = new KinesisData();
+ .process(exchange -> {
+ KinesisData data = new KinesisData();
- final Message message = exchange.getMessage();
+ final Message message = exchange.getMessage();
- if (message != null) {
- data.body = message.getBody(String.class);
- data.partition = message.getHeader(Kinesis2Constants.PARTITION_KEY, String.class);
- }
-
- receivedMessages.add(data);
+ if (message != null) {
+ data.body = message.getBody(String.class);
+ data.partition = message.getHeader(Kinesis2Constants.PARTITION_KEY, String.class);
}
+
+ receivedMessages.add(data);
})
.to("mock:result");
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
index 2e6a9a8fe4a..20f508e13bd 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerResumeIT.java
@@ -25,6 +25,7 @@ import org.apache.camel.EndpointInject;
import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.aws2.kinesis.consumer.KinesisResumeAdapter;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.resume.TransientResumeStrategy;
@@ -132,6 +133,7 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
@Override
protected RouteBuilder createRouteBuilder() {
client = AWSSDKClientUtils.newKinesisClient();
+ KinesisConnection.getInstance().setKinesisClient(client);
context.getRegistry().bind("amazonKinesisClient", client);
@@ -145,7 +147,6 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
fromF(kinesisEndpointUri, streamName)
.process(exchange -> {
KinesisData data = new KinesisData();
-
final Message message = exchange.getMessage();
if (message != null) {
@@ -183,12 +184,12 @@ public class KinesisConsumerResumeIT extends CamelTestSupport {
}
@DisplayName("Tests that the component can resume messages from AWS Kinesis")
- @Timeout(value = 2, unit = TimeUnit.MINUTES)
+ @Timeout(value = 3, unit = TimeUnit.MINUTES)
@Test
void testProduceMessages() {
result.expectedMessageCount(expectedCount);
- await().atMost(1, TimeUnit.MINUTES)
+ await().atMost(2, TimeUnit.MINUTES)
.untilAsserted(() -> result.assertIsSatisfied());
assertEquals(expectedCount, receivedMessages.size());
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
index 036f54f90e9..7c5207ffd70 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisProducerIT.java
@@ -25,6 +25,7 @@ import org.apache.camel.ExchangePattern;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.aws2.kinesis.Kinesis2Constants;
+import org.apache.camel.component.aws2.kinesis.consumer.KinesisConnection;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.test.infra.aws.common.AWSCommon;
import org.apache.camel.test.infra.aws.common.services.AWSService;
@@ -119,6 +120,8 @@ public class KinesisProducerIT extends CamelTestSupport {
protected RouteBuilder createRouteBuilder() {
client = AWSSDKClientUtils.newKinesisClient();
+ KinesisConnection.getInstance().setKinesisClient(client);
+
context.getRegistry().bind("amazonKinesisClient", client);
return new RouteBuilder() {