You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/24 09:46:07 UTC
[4/6] camel git commit: Added tests for the walking of shards.
Added tests for the walking of shards.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2430c0a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2430c0a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2430c0a
Branch: refs/heads/master
Commit: b2430c0ae4dbbf597288e0a901248b6c9995ac1f
Parents: 37040b1
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 16:46:05 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/DdbStreamConsumerTest.java | 242 +++++++++++++++++++
1 file changed, 242 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b2430c0a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
new file mode 100644
index 0000000..3ef6c70
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
@@ -0,0 +1,242 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import com.amazonaws.services.dynamodbv2.model.Stream;
+import com.amazonaws.services.dynamodbv2.model.StreamDescription;
+import com.amazonaws.services.dynamodbv2.model.StreamRecord;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DdbStreamConsumerTest {
+
+ private DdbStreamConsumer undertest;
+
+ @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
+ @Mock private AsyncProcessor processor;
+ private final CamelContext context = new DefaultCamelContext();
+ private final DdbStreamComponent component = new DdbStreamComponent(context);
+ private DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, "table_name", component);
+
+ private final String[] seqNums = new String[]{"2", "9", "11", "13", "14", "21", "25", "30", "35", "40"};
+
+ @Before
+ public void setup() throws Exception {
+ endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams);
+
+ when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn(
+ new ListStreamsResult()
+ .withStreams(new Stream()
+ .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp")
+ )
+ );
+
+ when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn(
+ new DescribeStreamResult()
+ .withStreamDescription(
+ new StreamDescription()
+ .withTableName("table_name")
+ .withShards(
+ ShardListTest.createShardsWithSequenceNumbers(null,
+ "a", "1", "5",
+ "b", "8", "15",
+ "c", "16", "16",
+ "d", "20", null
+ )
+ )
+ )
+ );
+
+ when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() {
+ @Override
+ public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable {
+ return new GetShardIteratorResult()
+ .withShardIterator("shard_iterator_"
+ + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId()
+ + "_000");
+ }
+ });
+
+ final Map<String, String> shardIterators = new HashMap<>();
+ shardIterators.put("shard_iterator_a_000", "shard_iterator_a_001");
+ shardIterators.put("shard_iterator_b_000", "shard_iterator_b_001");
+ shardIterators.put("shard_iterator_b_001", "shard_iterator_b_002");
+ shardIterators.put("shard_iterator_c_000", "shard_iterator_c_001");
+ shardIterators.put("shard_iterator_d_000", "shard_iterator_d_001");
+ final Map<String, Collection<Record>> answers = new HashMap<>();
+ answers.put("shard_iterator_a_001", createRecords("2"));
+ answers.put("shard_iterator_b_000", createRecords("9"));
+ answers.put("shard_iterator_b_001", createRecords("11", "13"));
+ answers.put("shard_iterator_b_002", createRecords("14"));
+ answers.put("shard_iterator_d_000", createRecords("21", "25"));
+ answers.put("shard_iterator_d_001", createRecords("30", "35", "40"));
+ when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(new Answer<GetRecordsResult>() {
+ @Override
+ public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable {
+ final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator();
+ String nextShardIterator = shardIterators.get(shardIterator); // note that HashMap returns null when there is no entry in the map. A null 'nextShardIterator' indicates that the shard has finished and we should move onto the next shard.
+ Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
+ Collection<Record> ans = answers.get(shardIterator);
+ if (nextShardIterator == null && m.matches()) { // last shard iterates forever.
+ Integer num = Integer.parseInt(m.group(1));
+ nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num+1), 3);
+ }
+ if (null == ans) { // default to an empty list of records.
+ ans = createRecords();
+ }
+ return new GetRecordsResult()
+ .withRecords(ans)
+ .withNextShardIterator(nextShardIterator);
+ }
+ });
+ }
+
+ String pad(String num, int to) {
+ // lazy padding
+ switch (num.length()) {
+ case 1:
+ return "00" + num;
+ case 2:
+ return "0" + num;
+ default:
+ return num;
+ }
+ }
+
+ @Test
+ public void latestOnlyUsesTheLastShard() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.LATEST);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
+ }
+
+ @Test
+ public void latestWithTwoPolls() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.LATEST);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+ undertest.poll();
+
+ ArgumentCaptor<GetRecordsRequest> getRecordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
+ verify(amazonDynamoDBStreams, times(2)).getRecords(getRecordsCaptor.capture());
+ assertThat(getRecordsCaptor.getAllValues().get(0).getShardIterator(), is("shard_iterator_d_000"));
+ assertThat(getRecordsCaptor.getAllValues().get(1).getShardIterator(), is("shard_iterator_d_001"));
+ }
+
+ @Test
+ public void trimHorizonStartsWithTheFirstShard() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("a"));
+ }
+
+ @Test
+ public void trimHorizonWalksAllShards() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ for (int i = 0; i < 9; ++i) {
+ undertest.poll();
+ }
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getAllValues().get(0).getShardId(), is("a"));
+ assertThat(getIteratorCaptor.getAllValues().get(1).getShardId(), is("b"));
+ assertThat(getIteratorCaptor.getAllValues().get(2).getShardId(), is("c"));
+ assertThat(getIteratorCaptor.getAllValues().get(3).getShardId(), is("d"));
+
+ ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+ verify(processor, times(seqNums.length)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
+
+ for (int i = 0; i < seqNums.length; ++i) {
+ assertThat(exchangeCaptor.getAllValues().get(i).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is(seqNums[i]));
+ }
+ }
+
+ @Test
+ public void atSeqNumber12StartsWithShardB() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+ endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12"));
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
+ }
+
+ @Test
+ public void afterSeqNumber16StartsWithShardC() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+ endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16"));
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("c"));
+ }
+
+ private static Collection<Record> createRecords(String... sequenceNumbers) {
+ List<Record> results = new ArrayList<>();
+
+ for (String seqNum : sequenceNumbers) {
+ results.add(new Record()
+ .withDynamodb(new StreamRecord().withSequenceNumber(seqNum))
+ );
+ }
+
+ return results;
+ }
+
+}
\ No newline at end of file