You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2023/08/30 17:20:29 UTC
[camel] branch camel-4.0.x updated: CAMEL-19811: restore multi-shard handling. (#11243)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch camel-4.0.x
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/camel-4.0.x by this push:
new aa3dd7090b9 CAMEL-19811: restore multi-shard handling. (#11243)
aa3dd7090b9 is described below
commit aa3dd7090b95c0b289a4698cce229d620b520fba
Author: klease <38...@users.noreply.github.com>
AuthorDate: Wed Aug 30 19:19:41 2023 +0200
CAMEL-19811: restore multi-shard handling. (#11243)
Adapt the handling of closed streams.
Modify KinesisUtils to be able to create a stream with multiple shards.
Modify KinesisConsumerIT to test a stream with 2 shards.
---
.../component/aws2/kinesis/Kinesis2Consumer.java | 62 +++++++++++++---------
.../KinesisConsumerClosedShardWithFailTest.java | 9 +++-
.../kinesis/integration/KinesisConsumerIT.java | 8 +--
.../test/infra/aws2/clients/KinesisUtils.java | 10 ++--
4 files changed, 56 insertions(+), 33 deletions(-)
diff --git a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
index 8e7a8692a35..87178332e51 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/main/java/org/apache/camel/component/aws2/kinesis/Kinesis2Consumer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.aws2.kinesis;
import java.util.ArrayDeque;
import java.util.List;
+import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -50,7 +51,7 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
private KinesisConnection connection;
private ResumeStrategy resumeStrategy;
- private String currentShardIterator;
+ private Map<String, String> currentShardIterators = new java.util.HashMap<>();
public Kinesis2Consumer(Kinesis2Endpoint endpoint,
Processor processor) {
@@ -167,27 +168,11 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
// getRecords request. That way, on the next poll, we start from where
// we left off, however, I don't know what happens to subsequent
// exchanges when an earlier exchange fails.
+ updateShardIterator(shard, result.nextShardIterator());
+ }
- currentShardIterator = result.nextShardIterator();
- if (currentShardIterator == null) {
- // This indicates that the shard is closed and no more data is available
- switch (getEndpoint().getConfiguration().getShardClosed()) {
- case ignore:
- LOG.warn("The shard with id={} on stream {} reached CLOSE status",
- shard.shardId(), getEndpoint().getConfiguration().getStreamName());
- break;
- case silent:
- break;
- case fail:
- LOG.info("The shard with id={} on stream {} reached CLOSE status",
- shard.shardId(), getEndpoint().getConfiguration().getStreamName());
- throw new IllegalStateException(
- new ReachedClosedStatusException(
- getEndpoint().getConfiguration().getStreamName(), shard.shardId()));
- default:
- throw new IllegalArgumentException("Unsupported shard closed strategy");
- }
- }
+ private void updateShardIterator(Shard shard, String nextShardIterator) {
+ currentShardIterators.put(shard.shardId(), nextShardIterator);
}
@Override
@@ -215,8 +200,14 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
throws ExecutionException, InterruptedException {
// either return a cached one or get a new one via a GetShardIterator
// request.
- if (currentShardIterator == null) {
- var shardId = shard.shardId();
+
+ var shardId = shard.shardId();
+
+ if (currentShardIterators.get(shardId) == null) {
+ if (currentShardIterators.containsKey(shardId)) {
+ // There was previously a shardIterator but shard is now closed
+ handleClosedShard(shardId);
+ }
GetShardIteratorRequest.Builder request = GetShardIteratorRequest.builder()
.streamName(getEndpoint().getConfiguration().getStreamName()).shardId(shardId)
@@ -244,12 +235,31 @@ public class Kinesis2Consumer extends ScheduledBatchPollingConsumer implements R
.getShardIterator(request.build());
}
- currentShardIterator = result.shardIterator();
- LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", currentShardIterator, shardId,
+ currentShardIterators.put(shardId, result.shardIterator());
+ LOG.debug("Obtained new ShardIterator {} for shard {} on stream {}", result.shardIterator(), shardId,
getEndpoint().getConfiguration().getStreamName());
}
- return currentShardIterator;
+ return currentShardIterators.get(shardId);
+ }
+
+ private void handleClosedShard(String shardId) {
+ switch (getEndpoint().getConfiguration().getShardClosed()) {
+ case ignore:
+ LOG.warn("The shard with id={} on stream {} reached CLOSE status",
+ shardId, getEndpoint().getConfiguration().getStreamName());
+ break;
+ case silent:
+ break;
+ case fail:
+ LOG.info("The shard with id={} on stream {} reached CLOSE status",
+ shardId, getEndpoint().getConfiguration().getStreamName());
+ throw new IllegalStateException(
+ new ReachedClosedStatusException(
+ getEndpoint().getConfiguration().getStreamName(), shardId));
+ default:
+ throw new IllegalArgumentException("Unsupported shard closed strategy");
+ }
}
private void resume(GetShardIteratorRequest.Builder req) {
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
index ea9b3882511..9f5156ca5a7 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/KinesisConsumerClosedShardWithFailTest.java
@@ -41,7 +41,9 @@ import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -92,6 +94,11 @@ public class KinesisConsumerClosedShardWithFailTest {
@Test
public void itObtainsAShardIteratorOnFirstPoll() {
+ try {
+ underTest.poll();
+ } catch (Exception e) {
+ fail("The first call should not throw an exception");
+ }
assertThrows(IllegalStateException.class, () -> {
underTest.poll();
});
@@ -106,7 +113,7 @@ public class KinesisConsumerClosedShardWithFailTest {
assertThat(getShardIteratorReqCap.getValue().shardId(), is("shardId"));
assertThat(getShardIteratorReqCap.getValue().shardIteratorType(), is(ShardIteratorType.LATEST));
- verify(kinesisClient).listShards(getListShardsCap.capture());
+ verify(kinesisClient, times(2)).listShards(getListShardsCap.capture());
assertThat(getListShardsCap.getValue().streamName(), is("streamName"));
}
}
diff --git a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
index b9614b1129a..46a62640dc6 100644
--- a/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
+++ b/components/camel-aws/camel-aws2-kinesis/src/test/java/org/apache/camel/component/aws2/kinesis/integration/KinesisConsumerIT.java
@@ -45,6 +45,7 @@ import software.amazon.awssdk.services.kinesis.KinesisClient;
import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.createStream;
import static org.apache.camel.test.infra.aws2.clients.KinesisUtils.putRecords;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.testcontainers.shaded.org.awaitility.Awaitility.await;
@@ -107,7 +108,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
@BeforeEach
public void prepareEnvironment() {
- createStream(client, streamName);
+ createStream(client, streamName, 2);
putRecords(client, streamName, messageCount);
}
@@ -122,7 +123,7 @@ public class KinesisConsumerIT extends CamelTestSupport {
.untilAsserted(() -> result.assertIsSatisfied());
assertEquals(messageCount, receivedMessages.size());
- int messageCount = 0;
+ String partitionKey = null;
for (KinesisData data : receivedMessages) {
ObjectHelper.notNull(data, "data");
assertNotNull(data.body, "The body should not be null");
@@ -132,7 +133,8 @@ public class KinesisConsumerIT extends CamelTestSupport {
and so on. This is just testing that the code is not mixing things up.
*/
assertTrue(data.partition.endsWith(data.body), "The data/partition mismatch for record: " + data);
- assertEquals(messageCount++, Integer.valueOf(data.partition.substring(data.partition.lastIndexOf('-') + 1)));
+ assertNotEquals(partitionKey, data.partition);
+ partitionKey = data.partition;
}
}
}
diff --git a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
index bec6c6022fa..32f02076245 100644
--- a/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
+++ b/test-infra/camel-test-infra-aws-v2/src/test/java/org/apache/camel/test/infra/aws2/clients/KinesisUtils.java
@@ -57,10 +57,10 @@ public final class KinesisUtils {
}
- private static void doCreateStream(KinesisClient kinesisClient, String streamName) {
+ private static void doCreateStream(KinesisClient kinesisClient, String streamName, int shardCount) {
CreateStreamRequest request = CreateStreamRequest.builder()
.streamName(streamName)
- .shardCount(1)
+ .shardCount(shardCount)
.build();
try {
@@ -78,6 +78,10 @@ public final class KinesisUtils {
}
public static void createStream(KinesisClient kinesisClient, String streamName) {
+ createStream(kinesisClient, streamName, 1);
+ }
+
+ public static void createStream(KinesisClient kinesisClient, String streamName, int shardCount) {
try {
LOG.info("Checking whether the stream exists already");
int status = getStreamStatus(kinesisClient, streamName);
@@ -89,7 +93,7 @@ public final class KinesisUtils {
LOG.info("The stream does not exist, auto creating it: {}", e.getMessage());
}
- doCreateStream(kinesisClient, streamName);
+ doCreateStream(kinesisClient, streamName, shardCount);
TestUtils.waitFor(() -> {
try {
GetRecordsRequest getRecordsRequest = KinesisUtils.getGetRecordsRequest(kinesisClient, streamName);