You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/12/24 09:46:04 UTC
[1/6] camel git commit: Fix CS.
Repository: camel
Updated Branches:
refs/heads/master 0350423d9 -> 0570a6ca3
Fix CS.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/37040b10
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/37040b10
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/37040b10
Branch: refs/heads/master
Commit: 37040b10adb4821fe656f506495b20e5b79cb3f7
Parents: 0a25e51
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 14:05:21 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100
----------------------------------------------------------------------
.../component/aws/ddbstream/ShardList.java | 22 ++++++-----
.../StringSequenceNumberConverter.java | 2 +-
.../aws/ddbstream/DdbStreamEndpointTest.java | 14 +++----
.../ShardListAfterSequenceParametrised.java | 40 ++++++++++++++------
.../ShardListAtSequenceParametrised.java | 40 ++++++++++++++------
.../component/aws/ddbstream/ShardListTest.java | 10 ++---
6 files changed, 82 insertions(+), 46 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 3ae1c4e..0a6e332 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -16,20 +16,21 @@
*/
package org.apache.camel.component.aws.ddbstream;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import com.amazonaws.services.dynamodbv2.model.Shard;
import java.math.BigInteger;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+
+import com.amazonaws.services.dynamodbv2.model.Shard;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class ShardList {
+
private final Logger log = LoggerFactory.getLogger(ShardList.class);
private final Map<String, Shard> shards = new HashMap<>();
@@ -54,6 +55,8 @@ class ShardList {
}
Shard first() {
+ // Potential optimisation: if the two provided sequence numbers are the
+ // same then we can skip the shard entirely. Need to confirm this with AWS.
for (Shard shard : shards.values()) {
if (!shards.containsKey(shard.getParentShardId())) {
return shard;
@@ -99,14 +102,15 @@ class ShardList {
}
}
if (shards.size() > 0) {
- return sorted.get(sorted.size()-1);
+ return sorted.get(sorted.size() - 1);
}
throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
}
/**
- * Removes shards that are older than the provided shard.
- * Does not remove the provided shard.
+ * Removes shards that are older than the provided shard. Does not remove
+ * the provided shard.
+ *
* @param removeBefore
*/
void removeOlderThan(Shard removeBefore) {
@@ -162,4 +166,4 @@ class ShardList {
}
}
}
-}
\ No newline at end of file
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
index 92bae2b..0048ce0 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
@@ -19,7 +19,7 @@ package org.apache.camel.component.aws.ddbstream;
import org.apache.camel.Converter;
@Converter
-public class StringSequenceNumberConverter {
+public final class StringSequenceNumberConverter {
private StringSequenceNumberConverter() {
}
http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
index 9e688be..1d3629e 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
@@ -20,30 +20,30 @@ import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
import org.apache.camel.CamelContext;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.SimpleRegistry;
-import static org.hamcrest.CoreMatchers.containsString;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-import org.junit.Test;
import org.junit.Before;
import org.junit.Rule;
+import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import org.mockito.runners.MockitoJUnitRunner;
@RunWith(MockitoJUnitRunner.class)
public class DdbStreamEndpointTest {
+ @Rule public ExpectedException expectedException = ExpectedException.none();
private CamelContext context;
@Mock private SequenceNumberProvider sequenceNumberProvider;
@Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
- @Rule public ExpectedException expectedException = ExpectedException.none();
-
@Before
public void setup() throws Exception {
SimpleRegistry registry = new SimpleRegistry();
http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
index c33ebe2..f7e6397 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
@@ -1,18 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.aws.ddbstream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
@RunWith(Parameterized.class)
public class ShardListAfterSequenceParametrised {
+ private ShardList undertest;
+
+ private final String inputSequenceNumber;
+ private final String expectedShardId;
+ public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+ this.inputSequenceNumber = inputSequenceNumber;
+ this.expectedShardId = expectedShardId;
+ }
@Parameterized.Parameters
public static Collection<Object[]> paramaters() {
@@ -29,16 +55,6 @@ public class ShardListAfterSequenceParametrised {
return results;
}
- private ShardList undertest;
-
- private final String inputSequenceNumber;
- private final String expectedShardId;
-
- public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
- this.inputSequenceNumber = inputSequenceNumber;
- this.expectedShardId = expectedShardId;
- }
-
@Before
public void setup() throws Exception {
undertest = new ShardList();
http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
index cf15021..9be5636 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
@@ -1,18 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.aws.ddbstream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
@RunWith(Parameterized.class)
public class ShardListAtSequenceParametrised {
+ private ShardList undertest;
+
+ private final String inputSequenceNumber;
+ private final String expectedShardId;
+ public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+ this.inputSequenceNumber = inputSequenceNumber;
+ this.expectedShardId = expectedShardId;
+ }
@Parameterized.Parameters
public static Collection<Object[]> paramaters() {
@@ -29,16 +55,6 @@ public class ShardListAtSequenceParametrised {
return results;
}
- private ShardList undertest;
-
- private final String inputSequenceNumber;
- private final String expectedShardId;
-
- public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
- this.inputSequenceNumber = inputSequenceNumber;
- this.expectedShardId = expectedShardId;
- }
-
@Before
public void setup() throws Exception {
undertest = new ShardList();
http://git-wip-us.apache.org/repos/asf/camel/blob/37040b10/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index e181d7e..68dd53d 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -17,15 +17,15 @@
package org.apache.camel.component.aws.ddbstream;
-import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
import java.util.ArrayList;
import java.util.List;
-import com.amazonaws.services.dynamodbv2.model.Shard;
+import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
+import com.amazonaws.services.dynamodbv2.model.Shard;
import org.junit.Test;
+
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
-import org.junit.Ignore;
public class ShardListTest {
@@ -125,8 +125,8 @@ public class ShardListTest {
List<Shard> result = new ArrayList<>();
for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) {
String id = shardIdsAndSeqNos[i];
- String seqStart = shardIdsAndSeqNos[i+1];
- String seqEnd = shardIdsAndSeqNos[i+2];
+ String seqStart = shardIdsAndSeqNos[i + 1];
+ String seqEnd = shardIdsAndSeqNos[i + 2];
result.add(new Shard()
.withShardId(id)
.withParentShardId(previous)
[4/6] camel git commit: Added tests for the walking of shards.
Posted by da...@apache.org.
Added tests for the walking of shards.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/b2430c0a
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/b2430c0a
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/b2430c0a
Branch: refs/heads/master
Commit: b2430c0ae4dbbf597288e0a901248b6c9995ac1f
Parents: 37040b1
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 16:46:05 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/DdbStreamConsumerTest.java | 242 +++++++++++++++++++
1 file changed, 242 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/b2430c0a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
new file mode 100644
index 0000000..3ef6c70
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
@@ -0,0 +1,242 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamRequest;
+import com.amazonaws.services.dynamodbv2.model.DescribeStreamResult;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsRequest;
+import com.amazonaws.services.dynamodbv2.model.GetRecordsResult;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorRequest;
+import com.amazonaws.services.dynamodbv2.model.GetShardIteratorResult;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
+import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
+import com.amazonaws.services.dynamodbv2.model.Record;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
+import com.amazonaws.services.dynamodbv2.model.Stream;
+import com.amazonaws.services.dynamodbv2.model.StreamDescription;
+import com.amazonaws.services.dynamodbv2.model.StreamRecord;
+import org.apache.camel.AsyncCallback;
+import org.apache.camel.AsyncProcessor;
+import org.apache.camel.CamelContext;
+import org.apache.camel.Exchange;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.runners.MockitoJUnitRunner;
+import org.mockito.stubbing.Answer;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DdbStreamConsumerTest {
+
+ private DdbStreamConsumer undertest;
+
+ @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
+ @Mock private AsyncProcessor processor;
+ private final CamelContext context = new DefaultCamelContext();
+ private final DdbStreamComponent component = new DdbStreamComponent(context);
+ private DdbStreamEndpoint endpoint = new DdbStreamEndpoint(null, "table_name", component);
+
+ private final String[] seqNums = new String[]{"2", "9", "11", "13", "14", "21", "25", "30", "35", "40"};
+
+ @Before
+ public void setup() throws Exception {
+ endpoint.setAmazonDynamoDbStreamsClient(amazonDynamoDBStreams);
+
+ when(amazonDynamoDBStreams.listStreams(any(ListStreamsRequest.class))).thenReturn(
+ new ListStreamsResult()
+ .withStreams(new Stream()
+ .withStreamArn("arn:aws:dynamodb:region:12345:table/table_name/stream/timestamp")
+ )
+ );
+
+ when(amazonDynamoDBStreams.describeStream(any(DescribeStreamRequest.class))).thenReturn(
+ new DescribeStreamResult()
+ .withStreamDescription(
+ new StreamDescription()
+ .withTableName("table_name")
+ .withShards(
+ ShardListTest.createShardsWithSequenceNumbers(null,
+ "a", "1", "5",
+ "b", "8", "15",
+ "c", "16", "16",
+ "d", "20", null
+ )
+ )
+ )
+ );
+
+ when(amazonDynamoDBStreams.getShardIterator(any(GetShardIteratorRequest.class))).thenAnswer(new Answer<GetShardIteratorResult>() {
+ @Override
+ public GetShardIteratorResult answer(InvocationOnMock invocation) throws Throwable {
+ return new GetShardIteratorResult()
+ .withShardIterator("shard_iterator_"
+ + ((GetShardIteratorRequest) invocation.getArguments()[0]).getShardId()
+ + "_000");
+ }
+ });
+
+ final Map<String, String> shardIterators = new HashMap<>();
+ shardIterators.put("shard_iterator_a_000", "shard_iterator_a_001");
+ shardIterators.put("shard_iterator_b_000", "shard_iterator_b_001");
+ shardIterators.put("shard_iterator_b_001", "shard_iterator_b_002");
+ shardIterators.put("shard_iterator_c_000", "shard_iterator_c_001");
+ shardIterators.put("shard_iterator_d_000", "shard_iterator_d_001");
+ final Map<String, Collection<Record>> answers = new HashMap<>();
+ answers.put("shard_iterator_a_001", createRecords("2"));
+ answers.put("shard_iterator_b_000", createRecords("9"));
+ answers.put("shard_iterator_b_001", createRecords("11", "13"));
+ answers.put("shard_iterator_b_002", createRecords("14"));
+ answers.put("shard_iterator_d_000", createRecords("21", "25"));
+ answers.put("shard_iterator_d_001", createRecords("30", "35", "40"));
+ when(amazonDynamoDBStreams.getRecords(any(GetRecordsRequest.class))).thenAnswer(new Answer<GetRecordsResult>() {
+ @Override
+ public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable {
+ final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator();
+ String nextShardIterator = shardIterators.get(shardIterator); // note that HashMap returns null when there is no entry in the map. A null 'nextShardIterator' indicates that the shard has finished and we should move onto the next shard.
+ Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
+ Collection<Record> ans = answers.get(shardIterator);
+ if (nextShardIterator == null && m.matches()) { // last shard iterates forever.
+ Integer num = Integer.parseInt(m.group(1));
+ nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num+1), 3);
+ }
+ if (null == ans) { // default to an empty list of records.
+ ans = createRecords();
+ }
+ return new GetRecordsResult()
+ .withRecords(ans)
+ .withNextShardIterator(nextShardIterator);
+ }
+ });
+ }
+
+ String pad(String num, int to) {
+ // lazy padding
+ switch (num.length()) {
+ case 1:
+ return "00" + num;
+ case 2:
+ return "0" + num;
+ default:
+ return num;
+ }
+ }
+
+ @Test
+ public void latestOnlyUsesTheLastShard() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.LATEST);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("d"));
+ }
+
+ @Test
+ public void latestWithTwoPolls() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.LATEST);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+ undertest.poll();
+
+ ArgumentCaptor<GetRecordsRequest> getRecordsCaptor = ArgumentCaptor.forClass(GetRecordsRequest.class);
+ verify(amazonDynamoDBStreams, times(2)).getRecords(getRecordsCaptor.capture());
+ assertThat(getRecordsCaptor.getAllValues().get(0).getShardIterator(), is("shard_iterator_d_000"));
+ assertThat(getRecordsCaptor.getAllValues().get(1).getShardIterator(), is("shard_iterator_d_001"));
+ }
+
+ @Test
+ public void trimHorizonStartsWithTheFirstShard() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("a"));
+ }
+
+ @Test
+ public void trimHorizonWalksAllShards() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.TRIM_HORIZON);
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ for (int i = 0; i < 9; ++i) {
+ undertest.poll();
+ }
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams, times(4)).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getAllValues().get(0).getShardId(), is("a"));
+ assertThat(getIteratorCaptor.getAllValues().get(1).getShardId(), is("b"));
+ assertThat(getIteratorCaptor.getAllValues().get(2).getShardId(), is("c"));
+ assertThat(getIteratorCaptor.getAllValues().get(3).getShardId(), is("d"));
+
+ ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+ verify(processor, times(seqNums.length)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
+
+ for (int i = 0; i < seqNums.length; ++i) {
+ assertThat(exchangeCaptor.getAllValues().get(i).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is(seqNums[i]));
+ }
+ }
+
+ @Test
+ public void atSeqNumber12StartsWithShardB() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+ endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("12"));
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("b"));
+ }
+
+ @Test
+ public void afterSeqNumber16StartsWithShardC() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+ endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("16"));
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ undertest.poll();
+
+ ArgumentCaptor<GetShardIteratorRequest> getIteratorCaptor = ArgumentCaptor.forClass(GetShardIteratorRequest.class);
+ verify(amazonDynamoDBStreams).getShardIterator(getIteratorCaptor.capture());
+ assertThat(getIteratorCaptor.getValue().getShardId(), is("c"));
+ }
+
+ private static Collection<Record> createRecords(String... sequenceNumbers) {
+ List<Record> results = new ArrayList<>();
+
+ for (String seqNum : sequenceNumbers) {
+ results.add(new Record()
+ .withDynamodb(new StreamRecord().withSequenceNumber(seqNum))
+ );
+ }
+
+ return results;
+ }
+
+}
\ No newline at end of file
[2/6] camel git commit: Add {at,
after}-sequence-number symantics to the shard list container
Posted by da...@apache.org.
Add {at,after}-sequence-number symantics to the shard list container
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e2b9d91c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e2b9d91c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e2b9d91c
Branch: refs/heads/master
Commit: e2b9d91c3428e659c36799466de7abe709c4d941
Parents: 0350423
Author: Candle <ca...@candle.me.uk>
Authored: Wed Dec 16 14:58:02 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100
----------------------------------------------------------------------
.../component/aws/ddbstream/ShardList.java | 67 ++++++++++++++++++++
.../ShardListAfterSequenceParametrised.java | 57 +++++++++++++++++
.../ShardListAtSequenceParametrised.java | 57 +++++++++++++++++
.../component/aws/ddbstream/ShardListTest.java | 24 ++++++-
4 files changed, 204 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index a0df179..3ae1c4e 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -21,6 +21,11 @@ import java.util.HashMap;
import java.util.Map;
import com.amazonaws.services.dynamodbv2.model.Shard;
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -70,6 +75,35 @@ class ShardList {
throw new IllegalStateException("Unable to find a shard with no children " + shards);
}
+ Shard afterSeq(String sequenceNumber) {
+ return atAfterSeq(sequenceNumber, After.INSTANCE);
+ }
+
+ Shard atSeq(String sequenceNumber) {
+ return atAfterSeq(sequenceNumber, At.INSTANCE);
+ }
+
+ Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
+ BigInteger atAfter = new BigInteger(sequenceNumber);
+ List<Shard> sorted = new ArrayList<>();
+ sorted.addAll(shards.values());
+ Collections.sort(sorted, StartingSequenceNumberComparator.INSTANCE);
+ for (Shard shard : sorted) {
+ if (shard.getSequenceNumberRange().getEndingSequenceNumber() != null) {
+ BigInteger end = new BigInteger(shard.getSequenceNumberRange().getEndingSequenceNumber());
+ // essentially: after < end or after <= end
+ if (condition.matches(atAfter, end)) {
+ return shard;
+ }
+
+ }
+ }
+ if (shards.size() > 0) {
+ return sorted.get(sorted.size()-1);
+ }
+ throw new IllegalStateException("Unable to find a shard with appropriate sequence numbers for " + sequenceNumber + " in " + shards);
+ }
+
/**
* Removes shards that are older than the provided shard.
* Does not remove the provided shard.
@@ -95,4 +129,37 @@ class ShardList {
public String toString() {
return "ShardList{" + "shards=" + shards + '}';
}
+
+ private interface AtAfterCondition {
+ boolean matches(BigInteger sequenceNumber, BigInteger option);
+ }
+
+ private static enum After implements AtAfterCondition {
+ INSTANCE() {
+ @Override
+ public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+ return providedSequenceNumber.compareTo(shardSequenceNumber) < 0;
+ }
+ }
+ }
+
+ private static enum At implements AtAfterCondition {
+ INSTANCE() {
+ @Override
+ public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
+ return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0;
+ }
+ }
+ }
+
+ private static enum StartingSequenceNumberComparator implements Comparator<Shard> {
+ INSTANCE() {
+ @Override
+ public int compare(Shard o1, Shard o2) {
+ BigInteger i1 = new BigInteger(o1.getSequenceNumberRange().getStartingSequenceNumber());
+ BigInteger i2 = new BigInteger(o2.getSequenceNumberRange().getStartingSequenceNumber());
+ return i1.compareTo(i2);
+ }
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
new file mode 100644
index 0000000..c33ebe2
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAfterSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAfterSequenceParametrised {
+
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> paramaters() {
+ List<Object[]> results = new ArrayList<>();
+ results.add(new Object[]{"0", "a"});
+ results.add(new Object[]{"3", "a"});
+ results.add(new Object[]{"6", "b"});
+ results.add(new Object[]{"8", "b"});
+ results.add(new Object[]{"15", "c"});
+ results.add(new Object[]{"16", "d"});
+ results.add(new Object[]{"18", "d"});
+ results.add(new Object[]{"25", "d"});
+ results.add(new Object[]{"30", "d"});
+ return results;
+ }
+
+ private ShardList undertest;
+
+ private final String inputSequenceNumber;
+ private final String expectedShardId;
+
+ public ShardListAfterSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+ this.inputSequenceNumber = inputSequenceNumber;
+ this.expectedShardId = expectedShardId;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ undertest = new ShardList();
+ undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+ "a", "1", "5",
+ "b", "8", "15",
+ "c", "16", "16",
+ "d", "20", null
+ ));
+ }
+
+ @Test
+ public void assertions() throws Exception {
+ assertThat(undertest.afterSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
new file mode 100644
index 0000000..cf15021
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListAtSequenceParametrised.java
@@ -0,0 +1,57 @@
+package org.apache.camel.component.aws.ddbstream;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class ShardListAtSequenceParametrised {
+
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> paramaters() {
+ List<Object[]> results = new ArrayList<>();
+ results.add(new Object[]{"0", "a"});
+ results.add(new Object[]{"3", "a"});
+ results.add(new Object[]{"6", "b"});
+ results.add(new Object[]{"8", "b"});
+ results.add(new Object[]{"15", "b"});
+ results.add(new Object[]{"16", "c"});
+ results.add(new Object[]{"18", "d"});
+ results.add(new Object[]{"25", "d"});
+ results.add(new Object[]{"30", "d"});
+ return results;
+ }
+
+ private ShardList undertest;
+
+ private final String inputSequenceNumber;
+ private final String expectedShardId;
+
+ public ShardListAtSequenceParametrised(String inputSequenceNumber, String expectedShardId) {
+ this.inputSequenceNumber = inputSequenceNumber;
+ this.expectedShardId = expectedShardId;
+ }
+
+ @Before
+ public void setup() throws Exception {
+ undertest = new ShardList();
+ undertest.addAll(ShardListTest.createShardsWithSequenceNumbers(null,
+ "a", "1", "5",
+ "b", "8", "15",
+ "c", "16", "16",
+ "d", "20", null
+ ));
+ }
+
+ @Test
+ public void assertions() throws Exception {
+ assertThat(undertest.atSeq(inputSequenceNumber).getShardId(), is(expectedShardId));
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/e2b9d91c/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
index 1b7249a..e181d7e 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/ShardListTest.java
@@ -17,6 +17,7 @@
package org.apache.camel.component.aws.ddbstream;
+import com.amazonaws.services.dynamodbv2.model.SequenceNumberRange;
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.services.dynamodbv2.model.Shard;
@@ -24,6 +25,7 @@ import com.amazonaws.services.dynamodbv2.model.Shard;
import org.junit.Test;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
+import org.junit.Ignore;
public class ShardListTest {
@@ -118,7 +120,27 @@ public class ShardListTest {
assertThat(shards.first().getShardId(), is("c"));
}
- List<Shard> createShards(String initialParent, String... shardIds) {
+ static List<Shard> createShardsWithSequenceNumbers(String initialParent, String... shardIdsAndSeqNos) {
+ String previous = initialParent;
+ List<Shard> result = new ArrayList<>();
+ for (int i = 0; i < shardIdsAndSeqNos.length; i += 3) {
+ String id = shardIdsAndSeqNos[i];
+ String seqStart = shardIdsAndSeqNos[i+1];
+ String seqEnd = shardIdsAndSeqNos[i+2];
+ result.add(new Shard()
+ .withShardId(id)
+ .withParentShardId(previous)
+ .withSequenceNumberRange(new SequenceNumberRange()
+ .withStartingSequenceNumber(seqStart)
+ .withEndingSequenceNumber(seqEnd)
+ )
+ );
+ previous = id;
+ }
+ return result;
+ }
+
+ static List<Shard> createShards(String initialParent, String... shardIds) {
String previous = initialParent;
List<Shard> result = new ArrayList<>();
for (String s : shardIds) {
[3/6] camel git commit: Use the shard list filtering when obtaining
the first shard when using a {AFTER, AT}_SEQUENCE_NUMBER iterator type.
Posted by da...@apache.org.
Use the shard list filtering when obtaining the first shard when using a {AFTER,AT}_SEQUENCE_NUMBER iterator type.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0a25e516
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0a25e516
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0a25e516
Branch: refs/heads/master
Commit: 0a25e51634f848d621d11a6c0abb503f4419bff2
Parents: e2b9d91
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 13:35:59 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:31 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/DdbStreamConsumer.java | 8 +-
.../aws/ddbstream/DdbStreamEndpoint.java | 38 ++++++-
.../aws/ddbstream/SequenceNumberProvider.java | 21 ++++
.../ddbstream/StaticSequenceNumberProvider.java | 31 ++++++
.../StringSequenceNumberConverter.java | 31 ++++++
.../services/org/apache/camel/TypeConverter | 1 +
.../aws/ddbstream/DdbStreamEndpointTest.java | 104 +++++++++++++++++++
7 files changed, 229 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index f5223c0..25e5f31 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -108,11 +108,17 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
LOG.trace("Current shard is: {} (in {})", currentShard, shardList);
if (currentShard == null) {
switch(getEndpoint().getIteratorType()) {
+ case AFTER_SEQUENCE_NUMBER:
+ currentShard = shardList.afterSeq(getEndpoint().getSequenceNumber());
+ break;
+ case AT_SEQUENCE_NUMBER:
+ currentShard = shardList.atSeq(getEndpoint().getSequenceNumber());
+ break;
case TRIM_HORIZON:
currentShard = shardList.first();
break;
- default:
case LATEST:
+ default:
currentShard = shardList.last();
break;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
index 543c432..bc12bc6 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpoint.java
@@ -49,7 +49,8 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
@UriParam(label = "consumer", description = "Defines where in the DynaboDB stream"
+ " to start getting records. Note that using TRIM_HORIZON can cause a"
+ " significant delay before the stream has caught up to real-time."
- + " Currently only LATEST and TRIM_HORIZON are supported.",
+ + " if {AT,AFTER}_SEQUENCE_NUMBER are used, then a sequenceNumberProvider"
+ + " MUST be supplied.",
defaultValue = "LATEST")
private ShardIteratorType iteratorType = ShardIteratorType.LATEST;
// TODO add the ability to use ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER
@@ -61,6 +62,11 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
// Note that the shard list needs to have the ability to start at the shard
// that includes the supplied sequence number
+ @UriParam(label = "consumer", description = "Provider for the sequence number when"
+ + " using one of the two ShardIteratorType.{AT,AFTER}_SEQUENCE_NUMBER"
+ + " iterator types. Can be a registry reference or a literal sequence number.")
+ private SequenceNumberProvider sequenceNumberProvider;
+
public DdbStreamEndpoint(String uri, String tableName, DdbStreamComponent component) {
super(uri, component);
this.tableName = tableName;
@@ -90,13 +96,30 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
return true;
}
+ public String getSequenceNumber() {
+ switch (getIteratorType()) {
+ case AFTER_SEQUENCE_NUMBER:
+ case AT_SEQUENCE_NUMBER:
+ if (null == getSequenceNumberProvider()) {
+ throw new IllegalStateException("sequenceNumberProvider must be"
+ + " provided, either as an implementation of"
+ + " SequenceNumberProvider or a literal String.");
+ } else {
+ return getSequenceNumberProvider().getSequenceNumber();
+ }
+ default:
+ return "";
+ }
+ }
+
@Override
public String toString() {
return "DdbStreamEndpoint{"
+ "tableName=" + tableName
+ ", amazonDynamoDbStreamsClient=[redacted], maxResultsPerRequest=" + maxResultsPerRequest
- + ", iteratorType="
- + iteratorType + ", uri=" + getEndpointUri()
+ + ", iteratorType=" + iteratorType
+ + ", sequenceNumberProvider=" + sequenceNumberProvider
+ + ", uri=" + getEndpointUri()
+ '}';
}
@@ -135,5 +158,12 @@ public class DdbStreamEndpoint extends ScheduledPollEndpoint {
public void setIteratorType(ShardIteratorType iteratorType) {
this.iteratorType = iteratorType;
}
-
+
+ public SequenceNumberProvider getSequenceNumberProvider() {
+ return sequenceNumberProvider;
+ }
+
+ public void setSequenceNumberProvider(SequenceNumberProvider sequenceNumberProvider) {
+ this.sequenceNumberProvider = sequenceNumberProvider;
+ }
}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
new file mode 100644
index 0000000..5a9dd8c
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/SequenceNumberProvider.java
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public interface SequenceNumberProvider {
+ String getSequenceNumber();
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
new file mode 100644
index 0000000..459767b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StaticSequenceNumberProvider.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+public class StaticSequenceNumberProvider implements SequenceNumberProvider {
+
+ private final String sequenceNumber;
+
+ public StaticSequenceNumberProvider(String sequenceNumber) {
+ this.sequenceNumber = sequenceNumber;
+ }
+
+ @Override
+ public String getSequenceNumber() {
+ return sequenceNumber;
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
new file mode 100644
index 0000000..92bae2b
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/StringSequenceNumberConverter.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import org.apache.camel.Converter;
+
+@Converter
+public class StringSequenceNumberConverter {
+
+ private StringSequenceNumberConverter() {
+ }
+
+ @Converter
+ public static SequenceNumberProvider toSequenceNumberProvider(String sequenceNumber) {
+ return new StaticSequenceNumberProvider(sequenceNumber);
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
index 4873a46..4472b59 100644
--- a/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
+++ b/components/camel-aws/src/main/resources/META-INF/services/org/apache/camel/TypeConverter
@@ -16,3 +16,4 @@
## ---------------------------------------------------------------------------
org.apache.camel.component.aws.kinesis.RecordStringConverter
+org.apache.camel.component.aws.ddbstream.StringSequenceNumberConverter
http://git-wip-us.apache.org/repos/asf/camel/blob/0a25e516/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
new file mode 100644
index 0000000..9e688be
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamEndpointTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import com.amazonaws.services.dynamodbv2.AmazonDynamoDBStreams;
+import org.apache.camel.CamelContext;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.SimpleRegistry;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import org.junit.Test;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class DdbStreamEndpointTest {
+
+ private CamelContext context;
+
+ @Mock private SequenceNumberProvider sequenceNumberProvider;
+ @Mock private AmazonDynamoDBStreams amazonDynamoDBStreams;
+
+ @Rule public ExpectedException expectedException = ExpectedException.none();
+
+ @Before
+ public void setup() throws Exception {
+ SimpleRegistry registry = new SimpleRegistry();
+ registry.put("someSeqNumProv", sequenceNumberProvider);
+ registry.put("ddbStreamsClient", amazonDynamoDBStreams);
+
+ context = new DefaultCamelContext(registry);
+ }
+
+ @Test
+ public void itExtractsTheSequenceNumber() throws Exception {
+ when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+ + "&sequenceNumberProvider=#someSeqNumProv");
+
+ assertThat(undertest.getSequenceNumber(), is("seq"));
+ }
+
+ @Test
+ public void itExtractsTheSequenceNumberFromALiteralString() throws Exception {
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=AFTER_SEQUENCE_NUMBER"
+ + "&sequenceNumberProvider=seq");
+
+ assertThat(undertest.getSequenceNumber(), is("seq"));
+ }
+
+ @Test
+ public void onSequenceNumberAgnosticIteratorsTheProviderIsIgnored() throws Exception {
+ when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=LATEST"
+ + "&sequenceNumberProvider=#someSeqNumProv");
+
+ assertThat(undertest.getSequenceNumber(), is(""));
+ verify(sequenceNumberProvider, never()).getSequenceNumber();
+ }
+
+ @Test
+ public void sequenceNumberFetchingThrowsSomethingUsefulIfMisconfigurered() throws Exception {
+ when(sequenceNumberProvider.getSequenceNumber()).thenReturn("seq");
+
+ expectedException.expectMessage(containsString("sequenceNumberProvider"));
+
+ DdbStreamEndpoint undertest = (DdbStreamEndpoint)context.getEndpoint("aws-ddbstream://table"
+ + "?amazonDynamoDbStreamsClient=#ddbStreamsClient"
+ + "&iteratorType=AT_SEQUENCE_NUMBER"); // NOTE: missing sequence number provider parameter
+
+ undertest.getSequenceNumber();
+ }
+}
\ No newline at end of file
[5/6] camel git commit: Add filtering of the records when asking for
an {at, after}_sequence_number iterator type.
Posted by da...@apache.org.
Add filtering of the records when asking for an {at,after}_sequence_number iterator type.
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/8e05657d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/8e05657d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/8e05657d
Branch: refs/heads/master
Commit: 8e05657d6a742bb6b8b0026c9683953c38168a66
Parents: b2430c0
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 18:29:38 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/AtAfterCondition.java | 44 +++++++++++++
.../aws/ddbstream/DdbStreamConsumer.java | 45 ++++++++++++-
.../component/aws/ddbstream/ShardList.java | 24 +------
.../aws/ddbstream/AtAfterConditionTest.java | 67 ++++++++++++++++++++
.../aws/ddbstream/DdbStreamConsumerTest.java | 56 +++++++++++++++-
5 files changed, 212 insertions(+), 24 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
new file mode 100644
index 0000000..a6798b5
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+
+interface AtAfterCondition {
+
+ /**
+ * @return true if sequenceNumber is (at,after) the endpointSequenceNumber.
+ */
+ boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber);
+
+ static enum Conditions implements AtAfterCondition {
+ AFTER() {
+ @Override
+ public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
+ return endpointSequenceNumber.compareTo(sequenceNumber) < 0;
+ }
+ },
+
+ AT() {
+ @Override
+ public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
+ return endpointSequenceNumber.compareTo(sequenceNumber) <= 0;
+ }
+ }
+ // TODO rename to LT/LTEQ/EQ/GTEQ/GT
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index 25e5f31..d520abf 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -16,6 +16,7 @@
*/
package org.apache.camel.component.aws.ddbstream;
+import java.math.BigInteger;
import java.util.ArrayDeque;
import java.util.List;
import java.util.Queue;
@@ -31,6 +32,7 @@ import com.amazonaws.services.dynamodbv2.model.ListStreamsRequest;
import com.amazonaws.services.dynamodbv2.model.ListStreamsResult;
import com.amazonaws.services.dynamodbv2.model.Record;
import com.amazonaws.services.dynamodbv2.model.Shard;
+import com.amazonaws.services.dynamodbv2.model.ShardIteratorType;
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
@@ -132,6 +134,30 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
.withStreamArn(streamArn)
.withShardId(currentShard.getShardId())
.withShardIteratorType(getEndpoint().getIteratorType());
+ switch(getEndpoint().getIteratorType()) {
+ case AFTER_SEQUENCE_NUMBER:
+ case AT_SEQUENCE_NUMBER:
+ // if you request with a sequence number that is LESS than the
+ // start of the shard, you get a HTTP 400 from AWS.
+ // So only add the sequence number if the endpoints
+ // sequence number is AT or AFTER the starting sequence for
+ // the shard.
+ // Otherwise change the shart iterator type to trim_horizon
+ // because we get a 400 when we use one of the
+ // {at,after}_sequence_number iterator types and don't supply
+ // a sequence number.
+ if (AtAfterCondition.Conditions.AT.matches(
+ new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
+ new BigInteger(getEndpoint().getSequenceNumber())
+ )) {
+ req = req.withSequenceNumber(getEndpoint().getSequenceNumber())
+ .withShardIteratorType(getEndpoint().getIteratorType());
+ } else {
+ req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
+ }
+ break;
+ default:
+ }
GetShardIteratorResult result = getClient().getShardIterator(req);
currentShardIterator = result.getShardIterator();
}
@@ -141,8 +167,25 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
private Queue<Exchange> createExchanges(List<Record> records) {
Queue<Exchange> exchanges = new ArrayDeque<>();
+ AtAfterCondition condition;
+ BigInteger providedSeqNum = null;
+ switch(getEndpoint().getIteratorType()) {
+ case AFTER_SEQUENCE_NUMBER:
+ condition = AtAfterCondition.Conditions.AFTER;
+ providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
+ break;
+ case AT_SEQUENCE_NUMBER:
+ condition = AtAfterCondition.Conditions.AT;
+ providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
+ break;
+ default:
+ condition = null;
+ }
for (Record record : records) {
- exchanges.add(getEndpoint().createExchange(record));
+ BigInteger recordSeqNum = new BigInteger(record.getDynamodb().getSequenceNumber());
+ if (condition == null || condition.matches(providedSeqNum, recordSeqNum)) {
+ exchanges.add(getEndpoint().createExchange(record));
+ }
}
return exchanges;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 0a6e332..5c5bd29 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -79,11 +79,11 @@ class ShardList {
}
Shard afterSeq(String sequenceNumber) {
- return atAfterSeq(sequenceNumber, After.INSTANCE);
+ return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AFTER);
}
Shard atSeq(String sequenceNumber) {
- return atAfterSeq(sequenceNumber, At.INSTANCE);
+ return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AT);
}
Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
@@ -134,27 +134,9 @@ class ShardList {
return "ShardList{" + "shards=" + shards + '}';
}
- private interface AtAfterCondition {
- boolean matches(BigInteger sequenceNumber, BigInteger option);
- }
- private static enum After implements AtAfterCondition {
- INSTANCE() {
- @Override
- public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
- return providedSequenceNumber.compareTo(shardSequenceNumber) < 0;
- }
- }
- }
- private static enum At implements AtAfterCondition {
- INSTANCE() {
- @Override
- public boolean matches(BigInteger providedSequenceNumber, BigInteger shardSequenceNumber) {
- return providedSequenceNumber.compareTo(shardSequenceNumber) <= 0;
- }
- }
- }
+
private static enum StartingSequenceNumberComparator implements Comparator<Shard> {
INSTANCE() {
http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
new file mode 100644
index 0000000..53aee40
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(Parameterized.class)
+public class AtAfterConditionTest {
+
+
+ private final AtAfterCondition condition;
+ private final int smaller;
+ private final int bigger;
+ private final boolean result;
+
+ public AtAfterConditionTest(AtAfterCondition condition, int smaller, int bigger, boolean result) {
+ this.condition = condition;
+ this.smaller = smaller;
+ this.bigger = bigger;
+ this.result = result;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> parameters() {
+ List<Object[]> results = new ArrayList<>();
+
+ results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 5, true});
+ results.add(new Object[]{AtAfterCondition.Conditions.AT , 1, 5, true});
+ results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 1, false});
+ results.add(new Object[]{AtAfterCondition.Conditions.AT , 1, 1, true});
+ results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 5, 1, false});
+ results.add(new Object[]{AtAfterCondition.Conditions.AT , 5, 1, false});
+
+ return results;
+ }
+
+ @Test
+ public void test() throws Exception {
+ assertThat(condition.matches(BigInteger.valueOf(smaller), BigInteger.valueOf(bigger)), is(result));
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/8e05657d/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
index 3ef6c70..0cbf9ca 100644
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumerTest.java
@@ -1,3 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
package org.apache.camel.component.aws.ddbstream;
import java.util.ArrayList;
@@ -110,12 +126,15 @@ public class DdbStreamConsumerTest {
@Override
public GetRecordsResult answer(InvocationOnMock invocation) throws Throwable {
final String shardIterator = ((GetRecordsRequest) invocation.getArguments()[0]).getShardIterator();
- String nextShardIterator = shardIterators.get(shardIterator); // note that HashMap returns null when there is no entry in the map. A null 'nextShardIterator' indicates that the shard has finished and we should move onto the next shard.
+ // note that HashMap returns null when there is no entry in the map.
+ // A null 'nextShardIterator' indicates that the shard has finished
+ // and we should move onto the next shard.
+ String nextShardIterator = shardIterators.get(shardIterator);
Matcher m = Pattern.compile("shard_iterator_d_0*(\\d+)").matcher(shardIterator);
Collection<Record> ans = answers.get(shardIterator);
if (nextShardIterator == null && m.matches()) { // last shard iterates forever.
Integer num = Integer.parseInt(m.group(1));
- nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num+1), 3);
+ nextShardIterator = "shard_iterator_d_" + pad(Integer.toString(num + 1), 3);
}
if (null == ans) { // default to an empty list of records.
ans = createRecords();
@@ -227,6 +246,39 @@ public class DdbStreamConsumerTest {
assertThat(getIteratorCaptor.getValue().getShardId(), is("c"));
}
+ @Test
+ public void atSeqNumber35GivesFirstRecordWithSeq35() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.AT_SEQUENCE_NUMBER);
+ endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35"));
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ for (int i = 0; i < 10; ++i) { // poll lots.
+ undertest.poll();
+ }
+
+ ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+ verify(processor, times(2)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
+
+ assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("35"));
+ assertThat(exchangeCaptor.getAllValues().get(1).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40"));
+ }
+
+ @Test
+ public void afterSeqNumber35GivesFirstRecordWithSeq40() throws Exception {
+ endpoint.setIteratorType(ShardIteratorType.AFTER_SEQUENCE_NUMBER);
+ endpoint.setSequenceNumberProvider(new StaticSequenceNumberProvider("35"));
+ undertest = new DdbStreamConsumer(endpoint, processor);
+
+ for (int i = 0; i < 10; ++i) { // poll lots.
+ undertest.poll();
+ }
+
+ ArgumentCaptor<Exchange> exchangeCaptor = ArgumentCaptor.forClass(Exchange.class);
+ verify(processor, times(1)).process(exchangeCaptor.capture(), any(AsyncCallback.class));
+
+ assertThat(exchangeCaptor.getAllValues().get(0).getIn().getBody(Record.class).getDynamodb().getSequenceNumber(), is("40"));
+ }
+
private static Collection<Record> createRecords(String... sequenceNumbers) {
List<Record> results = new ArrayList<>();
[6/6] camel git commit: Refactor to improve readability of comparisons
Posted by da...@apache.org.
Refactor to improve readability of comparisons
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/0570a6ca
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/0570a6ca
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/0570a6ca
Branch: refs/heads/master
Commit: 0570a6ca30fa3bf20a3606d91b4e021ecc9c6aa6
Parents: 8e05657
Author: Candle <ca...@candle.me.uk>
Authored: Mon Dec 21 18:44:33 2015 +0000
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Dec 24 09:45:32 2015 +0100
----------------------------------------------------------------------
.../aws/ddbstream/AtAfterCondition.java | 44 -------------
.../aws/ddbstream/BigIntComparisons.java | 44 +++++++++++++
.../aws/ddbstream/DdbStreamConsumer.java | 15 ++---
.../component/aws/ddbstream/ShardList.java | 6 +-
.../aws/ddbstream/AtAfterConditionTest.java | 67 --------------------
.../aws/ddbstream/BigIntComparisonsTest.java | 67 ++++++++++++++++++++
6 files changed, 121 insertions(+), 122 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
deleted file mode 100644
index a6798b5..0000000
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/AtAfterCondition.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws.ddbstream;
-
-import java.math.BigInteger;
-
-interface AtAfterCondition {
-
- /**
- * @return true if sequenceNumber is (at,after) the endpointSequenceNumber.
- */
- boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber);
-
- static enum Conditions implements AtAfterCondition {
- AFTER() {
- @Override
- public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
- return endpointSequenceNumber.compareTo(sequenceNumber) < 0;
- }
- },
-
- AT() {
- @Override
- public boolean matches(BigInteger endpointSequenceNumber, BigInteger sequenceNumber) {
- return endpointSequenceNumber.compareTo(sequenceNumber) <= 0;
- }
- }
- // TODO rename to LT/LTEQ/EQ/GTEQ/GT
- }
-}
http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java
new file mode 100644
index 0000000..a06bd0a
--- /dev/null
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/BigIntComparisons.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+
+interface BigIntComparisons {
+
+ /**
+ * @return true if the first parameter is LT/LTEQ/EQ/GTEQ/GT the second
+ */
+ boolean matches(BigInteger first, BigInteger second);
+
+ static enum Conditions implements BigIntComparisons {
+ LT() {
+ @Override
+ public boolean matches(BigInteger first, BigInteger second) {
+ return first.compareTo(second) < 0;
+ }
+ },
+
+ LTEQ() {
+ @Override
+ public boolean matches(BigInteger first, BigInteger second) {
+ return first.compareTo(second) <= 0;
+ }
+ }
+ // TODO Add EQ/GTEQ/GT as needed, but note that GTEQ == !LT and GT == !LTEQ and EQ == (!LT && !GT)
+ }
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
index d520abf..aa8224f 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/DdbStreamConsumer.java
@@ -140,18 +140,17 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
// if you request with a sequence number that is LESS than the
// start of the shard, you get a HTTP 400 from AWS.
// So only add the sequence number if the endpoints
- // sequence number is AT or AFTER the starting sequence for
- // the shard.
+ // sequence number is less than or equal to the starting
+ // sequence for the shard.
// Otherwise change the shart iterator type to trim_horizon
// because we get a 400 when we use one of the
// {at,after}_sequence_number iterator types and don't supply
// a sequence number.
- if (AtAfterCondition.Conditions.AT.matches(
+ if (BigIntComparisons.Conditions.LTEQ.matches(
new BigInteger(currentShard.getSequenceNumberRange().getStartingSequenceNumber()),
new BigInteger(getEndpoint().getSequenceNumber())
)) {
- req = req.withSequenceNumber(getEndpoint().getSequenceNumber())
- .withShardIteratorType(getEndpoint().getIteratorType());
+ req = req.withSequenceNumber(getEndpoint().getSequenceNumber());
} else {
req = req.withShardIteratorType(ShardIteratorType.TRIM_HORIZON);
}
@@ -167,15 +166,15 @@ public class DdbStreamConsumer extends ScheduledBatchPollingConsumer {
private Queue<Exchange> createExchanges(List<Record> records) {
Queue<Exchange> exchanges = new ArrayDeque<>();
- AtAfterCondition condition;
+ BigIntComparisons condition;
BigInteger providedSeqNum = null;
switch(getEndpoint().getIteratorType()) {
case AFTER_SEQUENCE_NUMBER:
- condition = AtAfterCondition.Conditions.AFTER;
+ condition = BigIntComparisons.Conditions.LT;
providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
break;
case AT_SEQUENCE_NUMBER:
- condition = AtAfterCondition.Conditions.AT;
+ condition = BigIntComparisons.Conditions.LTEQ;
providedSeqNum = new BigInteger(getEndpoint().getSequenceNumberProvider().getSequenceNumber());
break;
default:
http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
index 5c5bd29..8852ada 100644
--- a/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
+++ b/components/camel-aws/src/main/java/org/apache/camel/component/aws/ddbstream/ShardList.java
@@ -79,14 +79,14 @@ class ShardList {
}
Shard afterSeq(String sequenceNumber) {
- return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AFTER);
+ return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LT);
}
Shard atSeq(String sequenceNumber) {
- return atAfterSeq(sequenceNumber, AtAfterCondition.Conditions.AT);
+ return atAfterSeq(sequenceNumber, BigIntComparisons.Conditions.LTEQ);
}
- Shard atAfterSeq(String sequenceNumber, AtAfterCondition condition) {
+ Shard atAfterSeq(String sequenceNumber, BigIntComparisons condition) {
BigInteger atAfter = new BigInteger(sequenceNumber);
List<Shard> sorted = new ArrayList<>();
sorted.addAll(shards.values());
http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
deleted file mode 100644
index 53aee40..0000000
--- a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/AtAfterConditionTest.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.component.aws.ddbstream;
-
-import java.math.BigInteger;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import static org.hamcrest.CoreMatchers.is;
-import static org.junit.Assert.assertThat;
-
-
-@RunWith(Parameterized.class)
-public class AtAfterConditionTest {
-
-
- private final AtAfterCondition condition;
- private final int smaller;
- private final int bigger;
- private final boolean result;
-
- public AtAfterConditionTest(AtAfterCondition condition, int smaller, int bigger, boolean result) {
- this.condition = condition;
- this.smaller = smaller;
- this.bigger = bigger;
- this.result = result;
- }
-
- @Parameterized.Parameters
- public static Collection<Object[]> parameters() {
- List<Object[]> results = new ArrayList<>();
-
- results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 5, true});
- results.add(new Object[]{AtAfterCondition.Conditions.AT , 1, 5, true});
- results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 1, 1, false});
- results.add(new Object[]{AtAfterCondition.Conditions.AT , 1, 1, true});
- results.add(new Object[]{AtAfterCondition.Conditions.AFTER, 5, 1, false});
- results.add(new Object[]{AtAfterCondition.Conditions.AT , 5, 1, false});
-
- return results;
- }
-
- @Test
- public void test() throws Exception {
- assertThat(condition.matches(BigInteger.valueOf(smaller), BigInteger.valueOf(bigger)), is(result));
- }
-
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/0570a6ca/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java
new file mode 100644
index 0000000..c881d41
--- /dev/null
+++ b/components/camel-aws/src/test/java/org/apache/camel/component/aws/ddbstream/BigIntComparisonsTest.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.aws.ddbstream;
+
+import java.math.BigInteger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+
+
+@RunWith(Parameterized.class)
+public class BigIntComparisonsTest {
+
+
+ private final BigIntComparisons condition;
+ private final int smaller;
+ private final int bigger;
+ private final boolean result;
+
+ public BigIntComparisonsTest(BigIntComparisons condition, int smaller, int bigger, boolean result) {
+ this.condition = condition;
+ this.smaller = smaller;
+ this.bigger = bigger;
+ this.result = result;
+ }
+
+ @Parameterized.Parameters
+ public static Collection<Object[]> parameters() {
+ List<Object[]> results = new ArrayList<>();
+
+ results.add(new Object[]{BigIntComparisons.Conditions.LT , 1, 5, true});
+ results.add(new Object[]{BigIntComparisons.Conditions.LTEQ, 1, 5, true});
+ results.add(new Object[]{BigIntComparisons.Conditions.LT , 1, 1, false});
+ results.add(new Object[]{BigIntComparisons.Conditions.LTEQ, 1, 1, true});
+ results.add(new Object[]{BigIntComparisons.Conditions.LT , 5, 1, false});
+ results.add(new Object[]{BigIntComparisons.Conditions.LTEQ, 5, 1, false});
+
+ return results;
+ }
+
+ @Test
+ public void test() throws Exception {
+ assertThat(condition.matches(BigInteger.valueOf(smaller), BigInteger.valueOf(bigger)), is(result));
+ }
+
+}
\ No newline at end of file